Merge branch 'master' into fix-flaky-test

This commit is contained in:
Alexey Milovidov 2024-06-25 23:56:29 +02:00
commit 8523ac10bb
60 changed files with 1156 additions and 437 deletions

View File

@ -37,6 +37,7 @@ RUN pip3 install \
tqdm==4.66.4 \
types-requests \
unidiff \
jwt \
&& rm -rf /root/.cache/pip
RUN echo "en_US.UTF-8 UTF-8" > /etc/locale.gen && locale-gen en_US.UTF-8

View File

@ -6,26 +6,297 @@ sidebar_label: NLP (experimental)
# Natural Language Processing (NLP) Functions
:::note
:::warning
This is an experimental feature that is currently in development and is not ready for general use. It will change in unpredictable backwards-incompatible ways in future releases. Set `allow_experimental_nlp_functions = 1` to enable it.
:::
## detectCharset
The `detectCharset` function detects the character set of the non-UTF8-encoded input string.
*Syntax*
``` sql
detectCharset('text_to_be_analyzed')
```
*Arguments*
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../data-types/string.md#string).
*Returned value*
- A `String` containing the code of the detected character set
*Examples*
Query:
```sql
SELECT detectCharset('Ich bleibe für ein paar Tage.');
```
Result:
```response
┌─detectCharset('Ich bleibe für ein paar Tage.')─┐
│ WINDOWS-1252 │
└────────────────────────────────────────────────┘
```
## detectLanguage
Detects the language of the UTF8-encoded input string. The function uses the [CLD2 library](https://github.com/CLD2Owners/cld2) for detection, and it returns the 2-letter ISO language code.
The `detectLanguage` function works best when providing over 200 characters in the input string.
*Syntax*
``` sql
detectLanguage('text_to_be_analyzed')
```
*Arguments*
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../data-types/string.md#string).
*Returned value*
- The 2-letter ISO code of the detected language
Other possible results:
- `un` = unknown, can not detect any language.
- `other` = the detected language does not have 2 letter code.
*Examples*
Query:
```sql
SELECT detectLanguage('Je pense que je ne parviendrai jamais à parler français comme un natif. Where theres a will, theres a way.');
```
Result:
```response
fr
```
## detectLanguageMixed
Similar to the `detectLanguage` function, but `detectLanguageMixed` returns a `Map` of 2-letter language codes that are mapped to the percentage of the certain language in the text.
*Syntax*
``` sql
detectLanguageMixed('text_to_be_analyzed')
```
*Arguments*
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../data-types/string.md#string).
*Returned value*
- `Map(String, Float32)`: The keys are 2-letter ISO codes and the values are a percentage of text found for that language
*Examples*
Query:
```sql
SELECT detectLanguageMixed('二兎を追う者は一兎をも得ず二兎を追う者は一兎をも得ず A vaincre sans peril, on triomphe sans gloire.');
```
Result:
```response
┌─detectLanguageMixed()─┐
│ {'ja':0.62,'fr':0.36 │
└───────────────────────┘
```
## detectProgrammingLanguage
Determines the programming language from the source code. Calculates all the unigrams and bigrams of commands in the source code.
Then using a marked-up dictionary with weights of unigrams and bigrams of commands for various programming languages finds the biggest weight of the programming language and returns it.
*Syntax*
``` sql
detectProgrammingLanguage('source_code')
```
*Arguments*
- `source_code` — String representation of the source code to analyze. [String](../data-types/string.md#string).
*Returned value*
- Programming language. [String](../data-types/string.md).
*Examples*
Query:
```sql
SELECT detectProgrammingLanguage('#include <iostream>');
```
Result:
```response
┌─detectProgrammingLanguage('#include <iostream>')─┐
│ C++ │
└──────────────────────────────────────────────────┘
```
## detectLanguageUnknown
Similar to the `detectLanguage` function, except the `detectLanguageUnknown` function works with non-UTF8-encoded strings. Prefer this version when your character set is UTF-16 or UTF-32.
*Syntax*
``` sql
detectLanguageUnknown('text_to_be_analyzed')
```
*Arguments*
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../data-types/string.md#string).
*Returned value*
- The 2-letter ISO code of the detected language
Other possible results:
- `un` = unknown, can not detect any language.
- `other` = the detected language does not have 2 letter code.
*Examples*
Query:
```sql
SELECT detectLanguageUnknown('Ich bleibe für ein paar Tage.');
```
Result:
```response
┌─detectLanguageUnknown('Ich bleibe für ein paar Tage.')─┐
│ de │
└────────────────────────────────────────────────────────┘
```
## detectTonality
Determines the sentiment of text data. Uses a marked-up sentiment dictionary, in which each word has a tonality ranging from `-12` to `6`.
For each text, it calculates the average sentiment value of its words and returns it in the range `[-1,1]`.
:::note
This function is limited in its current form. Currently it makes use of the embedded emotional dictionary at `/contrib/nlp-data/tonality_ru.zst` and only works for the Russian language.
:::
*Syntax*
``` sql
detectTonality(text)
```
*Arguments*
- `text` — The text to be analyzed. [String](../data-types/string.md#string).
*Returned value*
- The average sentiment value of the words in `text`. [Float32](../data-types/float.md).
*Examples*
Query:
```sql
SELECT detectTonality('Шарик - хороший пёс'), -- Sharik is a good dog
detectTonality('Шарик - пёс'), -- Sharik is a dog
detectTonality('Шарик - плохой пёс'); -- Sharkik is a bad dog
```
Result:
```response
┌─detectTonality('Шарик - хороший пёс')─┬─detectTonality('Шарик - пёс')─┬─detectTonality('Шарик - плохой пёс')─┐
│ 0.44445 │ 0 │ -0.3 │
└───────────────────────────────────────┴───────────────────────────────┴──────────────────────────────────────┘
```
## lemmatize
Performs lemmatization on a given word. Needs dictionaries to operate, which can be obtained [here](https://github.com/vpodpecan/lemmagen3/tree/master/src/lemmagen3/models).
*Syntax*
``` sql
lemmatize('language', word)
```
*Arguments*
- `language` — Language which rules will be applied. [String](../data-types/string.md#string).
- `word` — Word that needs to be lemmatized. Must be lowercase. [String](../data-types/string.md#string).
*Examples*
Query:
``` sql
SELECT lemmatize('en', 'wolves');
```
Result:
``` text
┌─lemmatize("wolves")─┐
│ "wolf" │
└─────────────────────┘
```
*Configuration*
This configuration specifies that the dictionary `en.bin` should be used for lemmatization of English (`en`) words. The `.bin` files can be downloaded from
[here](https://github.com/vpodpecan/lemmagen3/tree/master/src/lemmagen3/models).
``` xml
<lemmatizers>
<lemmatizer>
<!-- highlight-start -->
<lang>en</lang>
<path>en.bin</path>
<!-- highlight-end -->
</lemmatizer>
</lemmatizers>
```
## stem
Performs stemming on a given word.
### Syntax
*Syntax*
``` sql
stem('language', word)
```
### Arguments
*Arguments*
- `language` — Language which rules will be applied. Use the two letter [ISO 639-1 code](https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes).
- `word` — word that needs to be stemmed. Must be in lowercase. [String](../data-types/string.md#string).
### Examples
*Examples*
Query:
@ -40,7 +311,7 @@ Result:
│ ['I','think','it','is','a','bless','in','disguis'] │
└────────────────────────────────────────────────────┘
```
### Supported languages for stem()
*Supported languages for stem()*
:::note
The stem() function uses the [Snowball stemming](https://snowballstem.org/) library, see the Snowball website for updated languages etc.
@ -76,53 +347,6 @@ The stem() function uses the [Snowball stemming](https://snowballstem.org/) libr
- Turkish
- Yiddish
## lemmatize
Performs lemmatization on a given word. Needs dictionaries to operate, which can be obtained [here](https://github.com/vpodpecan/lemmagen3/tree/master/src/lemmagen3/models).
### Syntax
``` sql
lemmatize('language', word)
```
### Arguments
- `language` — Language which rules will be applied. [String](../data-types/string.md#string).
- `word` — Word that needs to be lemmatized. Must be lowercase. [String](../data-types/string.md#string).
### Examples
Query:
``` sql
SELECT lemmatize('en', 'wolves');
```
Result:
``` text
┌─lemmatize("wolves")─┐
│ "wolf" │
└─────────────────────┘
```
### Configuration
This configuration specifies that the dictionary `en.bin` should be used for lemmatization of English (`en`) words. The `.bin` files can be downloaded from
[here](https://github.com/vpodpecan/lemmagen3/tree/master/src/lemmagen3/models).
``` xml
<lemmatizers>
<lemmatizer>
<!-- highlight-start -->
<lang>en</lang>
<path>en.bin</path>
<!-- highlight-end -->
</lemmatizer>
</lemmatizers>
```
## synonyms
Finds synonyms to a given word. There are two types of synonym extensions: `plain` and `wordnet`.
@ -131,18 +355,18 @@ With the `plain` extension type we need to provide a path to a simple text file,
With the `wordnet` extension type we need to provide a path to a directory with WordNet thesaurus in it. Thesaurus must contain a WordNet sense index.
### Syntax
*Syntax*
``` sql
synonyms('extension_name', word)
```
### Arguments
*Arguments*
- `extension_name` — Name of the extension in which search will be performed. [String](../data-types/string.md#string).
- `word` — Word that will be searched in extension. [String](../data-types/string.md#string).
### Examples
*Examples*
Query:
@ -158,7 +382,7 @@ Result:
└──────────────────────────────────────────┘
```
### Configuration
*Configuration*
``` xml
<synonyms_extensions>
<extension>
@ -172,154 +396,4 @@ Result:
<path>en/</path>
</extension>
</synonyms_extensions>
```
## detectLanguage
Detects the language of the UTF8-encoded input string. The function uses the [CLD2 library](https://github.com/CLD2Owners/cld2) for detection, and it returns the 2-letter ISO language code.
The `detectLanguage` function works best when providing over 200 characters in the input string.
### Syntax
``` sql
detectLanguage('text_to_be_analyzed')
```
### Arguments
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../data-types/string.md#string).
### Returned value
- The 2-letter ISO code of the detected language
Other possible results:
- `un` = unknown, can not detect any language.
- `other` = the detected language does not have 2 letter code.
### Examples
Query:
```sql
SELECT detectLanguage('Je pense que je ne parviendrai jamais à parler français comme un natif. Where theres a will, theres a way.');
```
Result:
```response
fr
```
## detectLanguageMixed
Similar to the `detectLanguage` function, but `detectLanguageMixed` returns a `Map` of 2-letter language codes that are mapped to the percentage of the certain language in the text.
### Syntax
``` sql
detectLanguageMixed('text_to_be_analyzed')
```
### Arguments
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../data-types/string.md#string).
### Returned value
- `Map(String, Float32)`: The keys are 2-letter ISO codes and the values are a percentage of text found for that language
### Examples
Query:
```sql
SELECT detectLanguageMixed('二兎を追う者は一兎をも得ず二兎を追う者は一兎をも得ず A vaincre sans peril, on triomphe sans gloire.');
```
Result:
```response
┌─detectLanguageMixed()─┐
│ {'ja':0.62,'fr':0.36 │
└───────────────────────┘
```
## detectLanguageUnknown
Similar to the `detectLanguage` function, except the `detectLanguageUnknown` function works with non-UTF8-encoded strings. Prefer this version when your character set is UTF-16 or UTF-32.
### Syntax
``` sql
detectLanguageUnknown('text_to_be_analyzed')
```
### Arguments
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../data-types/string.md#string).
### Returned value
- The 2-letter ISO code of the detected language
Other possible results:
- `un` = unknown, can not detect any language.
- `other` = the detected language does not have 2 letter code.
### Examples
Query:
```sql
SELECT detectLanguageUnknown('Ich bleibe für ein paar Tage.');
```
Result:
```response
┌─detectLanguageUnknown('Ich bleibe für ein paar Tage.')─┐
│ de │
└────────────────────────────────────────────────────────┘
```
## detectCharset
The `detectCharset` function detects the character set of the non-UTF8-encoded input string.
### Syntax
``` sql
detectCharset('text_to_be_analyzed')
```
### Arguments
- `text_to_be_analyzed` — A collection (or sentences) of strings to analyze. [String](../data-types/string.md#string).
### Returned value
- A `String` containing the code of the detected character set
### Examples
Query:
```sql
SELECT detectCharset('Ich bleibe für ein paar Tage.');
```
Result:
```response
┌─detectCharset('Ich bleibe für ein paar Tage.')─┐
│ WINDOWS-1252 │
└────────────────────────────────────────────────┘
```
```

View File

@ -3820,3 +3820,43 @@ Result:
10. │ df │ │
└────┴───────────────────────┘
```
## displayName
Returns the value of `display_name` from [config](../../operations/configuration-files.md/#configuration-files) or server Fully Qualified Domain Name (FQDN) if not set.
**Syntax**
```sql
displayName()
```
**Returned value**
- Value of `display_name` from config or server FQDN if not set. [String](../data-types/string.md).
**Example**
The `display_name` can be set in `config.xml`. Taking for example a server with `display_name` configured to 'production':
```xml
<!-- It is the name that will be shown in the clickhouse-client.
By default, anything with "production" will be highlighted in red in query prompt.
-->
<display_name>production</display_name>
```
Query:
```sql
SELECT displayName();
```
Result:
```response
┌─displayName()─┐
│ production │
└───────────────┘
```

View File

@ -9,8 +9,8 @@ sidebar_label: CONSTRAINT
Constraints could be added or deleted using following syntax:
``` sql
ALTER TABLE [db].name [ON CLUSTER cluster] ADD CONSTRAINT constraint_name CHECK expression;
ALTER TABLE [db].name [ON CLUSTER cluster] DROP CONSTRAINT constraint_name;
ALTER TABLE [db].name [ON CLUSTER cluster] ADD CONSTRAINT [IF NOT EXISTS] constraint_name CHECK expression;
ALTER TABLE [db].name [ON CLUSTER cluster] DROP CONSTRAINT [IF EXISTS] constraint_name;
```
See more on [constraints](../../../sql-reference/statements/create/table.md#constraints).

View File

@ -11,8 +11,8 @@ sidebar_label: "Манипуляции с ограничениями"
Добавить или удалить ограничение можно с помощью запросов
``` sql
ALTER TABLE [db].name [ON CLUSTER cluster] ADD CONSTRAINT constraint_name CHECK expression;
ALTER TABLE [db].name [ON CLUSTER cluster] DROP CONSTRAINT constraint_name;
ALTER TABLE [db].name [ON CLUSTER cluster] ADD CONSTRAINT [IF NOT EXISTS] constraint_name CHECK expression;
ALTER TABLE [db].name [ON CLUSTER cluster] DROP CONSTRAINT [IF EXISTS] constraint_name;
```
Запросы выполняют добавление или удаление метаданных об ограничениях таблицы `[db].name`, поэтому выполняются мгновенно.

View File

@ -9,8 +9,8 @@ sidebar_label: 约束
约束可以使用以下语法添加或删除:
``` sql
ALTER TABLE [db].name ADD CONSTRAINT constraint_name CHECK expression;
ALTER TABLE [db].name DROP CONSTRAINT constraint_name;
ALTER TABLE [db].name [ON CLUSTER cluster] ADD CONSTRAINT [IF NOT EXISTS] constraint_name CHECK expression;
ALTER TABLE [db].name [ON CLUSTER cluster] DROP CONSTRAINT [IF EXISTS] constraint_name;
```
查看[constraints](../../../sql-reference/statements/create/table.mdx#constraints)。

View File

@ -577,8 +577,7 @@ try
#if USE_SSL
CertificateReloader::instance().tryLoad(*config);
#endif
},
/* already_loaded = */ false); /// Reload it right now (initial loading)
});
SCOPE_EXIT({
LOG_INFO(log, "Shutting down.");

View File

@ -1540,6 +1540,8 @@ try
global_context->setMaxDictionaryNumToWarn(new_server_settings.max_dictionary_num_to_warn);
global_context->setMaxDatabaseNumToWarn(new_server_settings.max_database_num_to_warn);
global_context->setMaxPartNumToWarn(new_server_settings.max_part_num_to_warn);
/// Only for system.server_settings
global_context->setConfigReloaderInterval(new_server_settings.config_reload_interval_ms);
SlotCount concurrent_threads_soft_limit = UnlimitedSlots;
if (new_server_settings.concurrent_threads_soft_limit_num > 0 && new_server_settings.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
@ -1702,8 +1704,7 @@ try
/// Must be the last.
latest_config = config;
},
/* already_loaded = */ false); /// Reload it right now (initial loading)
});
const auto listen_hosts = getListenHosts(config());
const auto interserver_listen_hosts = getInterserverListenHosts(config());

View File

@ -880,8 +880,7 @@ void UsersConfigAccessStorage::load(
Settings::checkNoSettingNamesAtTopLevel(*new_config, users_config_path);
parseFromConfig(*new_config);
access_control.getChangesNotifier().sendNotifications();
},
/* already_loaded = */ false);
});
}
void UsersConfigAccessStorage::startPeriodicReloading()

View File

@ -0,0 +1,283 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Core/ServerSettings.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <Interpreters/castColumn.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
struct GroupConcatDataBase
{
UInt64 data_size = 0;
UInt64 allocated_size = 0;
char * data = nullptr;
void checkAndUpdateSize(UInt64 add, Arena * arena)
{
if (data_size + add >= allocated_size)
{
auto old_size = allocated_size;
allocated_size = std::max(2 * allocated_size, data_size + add);
data = arena->realloc(data, old_size, allocated_size);
}
}
void insertChar(const char * str, UInt64 str_size, Arena * arena)
{
checkAndUpdateSize(str_size, arena);
memcpy(data + data_size, str, str_size);
data_size += str_size;
}
void insert(const IColumn * column, const SerializationPtr & serialization, size_t row_num, Arena * arena)
{
WriteBufferFromOwnString buff;
serialization->serializeText(*column, row_num, buff, FormatSettings{});
auto string = buff.stringView();
insertChar(string.data(), string.size(), arena);
}
};
template <bool has_limit>
struct GroupConcatData;
template<>
struct GroupConcatData<false> final : public GroupConcatDataBase
{
};
template<>
struct GroupConcatData<true> final : public GroupConcatDataBase
{
using Offset = UInt64;
using Allocator = MixedAlignedArenaAllocator<alignof(Offset), 4096>;
using Offsets = PODArray<Offset, 32, Allocator>;
/// offset[i * 2] - beginning of the i-th row, offset[i * 2 + 1] - end of the i-th row
Offsets offsets;
UInt64 num_rows = 0;
UInt64 getSize(size_t i) const { return offsets[i * 2 + 1] - offsets[i * 2]; }
UInt64 getString(size_t i) const { return offsets[i * 2]; }
void insert(const IColumn * column, const SerializationPtr & serialization, size_t row_num, Arena * arena)
{
WriteBufferFromOwnString buff;
serialization->serializeText(*column, row_num, buff, {});
auto string = buff.stringView();
checkAndUpdateSize(string.size(), arena);
memcpy(data + data_size, string.data(), string.size());
offsets.push_back(data_size, arena);
data_size += string.size();
offsets.push_back(data_size, arena);
num_rows++;
}
};
template <bool has_limit>
class GroupConcatImpl final
: public IAggregateFunctionDataHelper<GroupConcatData<has_limit>, GroupConcatImpl<has_limit>>
{
static constexpr auto name = "groupConcat";
SerializationPtr serialization;
UInt64 limit;
const String delimiter;
public:
GroupConcatImpl(const DataTypePtr & data_type_, const Array & parameters_, UInt64 limit_, const String & delimiter_)
: IAggregateFunctionDataHelper<GroupConcatData<has_limit>, GroupConcatImpl<has_limit>>(
{data_type_}, parameters_, std::make_shared<DataTypeString>())
, serialization(this->argument_types[0]->getDefaultSerialization())
, limit(limit_)
, delimiter(delimiter_)
{
}
String getName() const override { return name; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto & cur_data = this->data(place);
if constexpr (has_limit)
if (cur_data.num_rows >= limit)
return;
if (cur_data.data_size != 0)
cur_data.insertChar(delimiter.c_str(), delimiter.size(), arena);
cur_data.insert(columns[0], serialization, row_num, arena);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & cur_data = this->data(place);
auto & rhs_data = this->data(rhs);
if (rhs_data.data_size == 0)
return;
if constexpr (has_limit)
{
UInt64 new_elems_count = std::min(rhs_data.num_rows, limit - cur_data.num_rows);
for (UInt64 i = 0; i < new_elems_count; ++i)
{
if (cur_data.data_size != 0)
cur_data.insertChar(delimiter.c_str(), delimiter.size(), arena);
cur_data.offsets.push_back(cur_data.data_size, arena);
cur_data.insertChar(rhs_data.data + rhs_data.getString(i), rhs_data.getSize(i), arena);
cur_data.num_rows++;
cur_data.offsets.push_back(cur_data.data_size, arena);
}
}
else
{
if (cur_data.data_size != 0)
cur_data.insertChar(delimiter.c_str(), delimiter.size(), arena);
cur_data.insertChar(rhs_data.data, rhs_data.data_size, arena);
}
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
auto & cur_data = this->data(place);
writeVarUInt(cur_data.data_size, buf);
buf.write(cur_data.data, cur_data.data_size);
if constexpr (has_limit)
{
writeVarUInt(cur_data.num_rows, buf);
for (const auto & offset : cur_data.offsets)
writeVarUInt(offset, buf);
}
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
auto & cur_data = this->data(place);
UInt64 temp_size = 0;
readVarUInt(temp_size, buf);
cur_data.checkAndUpdateSize(temp_size, arena);
buf.readStrict(cur_data.data + cur_data.data_size, temp_size);
cur_data.data_size = temp_size;
if constexpr (has_limit)
{
readVarUInt(cur_data.num_rows, buf);
cur_data.offsets.resize_exact(cur_data.num_rows * 2, arena);
for (auto & offset : cur_data.offsets)
readVarUInt(offset, buf);
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto & cur_data = this->data(place);
if (cur_data.data_size == 0)
{
to.insertDefault();
return;
}
auto & column_string = assert_cast<ColumnString &>(to);
column_string.insertData(cur_data.data, cur_data.data_size);
}
bool allocatesMemoryInArena() const override { return true; }
};
AggregateFunctionPtr createAggregateFunctionGroupConcat(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);
bool has_limit = false;
UInt64 limit = 0;
String delimiter;
if (parameters.size() > 2)
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION,
"Incorrect number of parameters for aggregate function {}, should be 0, 1 or 2, got: {}", name, parameters.size());
if (!parameters.empty())
{
auto type = parameters[0].getType();
if (type != Field::Types::String)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First parameter for aggregate function {} should be string", name);
delimiter = parameters[0].get<String>();
}
if (parameters.size() == 2)
{
auto type = parameters[1].getType();
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second parameter for aggregate function {} should be a positive number", name);
if ((type == Field::Types::Int64 && parameters[1].get<Int64>() <= 0) ||
(type == Field::Types::UInt64 && parameters[1].get<UInt64>() == 0))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second parameter for aggregate function {} should be a positive number, got: {}", name, parameters[1].get<Int64>());
has_limit = true;
limit = parameters[1].get<UInt64>();
}
if (has_limit)
return std::make_shared<GroupConcatImpl</* has_limit= */ true>>(argument_types[0], parameters, limit, delimiter);
else
return std::make_shared<GroupConcatImpl</* has_limit= */ false>>(argument_types[0], parameters, limit, delimiter);
}
}
void registerAggregateFunctionGroupConcat(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true };
factory.registerFunction("groupConcat", { createAggregateFunctionGroupConcat, properties });
factory.registerAlias("group_concat", "groupConcat", AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -19,6 +19,7 @@ void registerAggregateFunctionGroupArraySorted(AggregateFunctionFactory & factor
void registerAggregateFunctionGroupUniqArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArrayInsertAt(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArrayIntersect(AggregateFunctionFactory &);
void registerAggregateFunctionGroupConcat(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantile(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileDeterministic(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileExact(AggregateFunctionFactory &);
@ -120,6 +121,7 @@ void registerAggregateFunctions()
registerAggregateFunctionGroupUniqArray(factory);
registerAggregateFunctionGroupArrayInsertAt(factory);
registerAggregateFunctionGroupArrayIntersect(factory);
registerAggregateFunctionGroupConcat(factory);
registerAggregateFunctionsQuantile(factory);
registerAggregateFunctionsQuantileDeterministic(factory);
registerAggregateFunctionsQuantileExact(factory);

View File

@ -19,8 +19,7 @@ ConfigReloader::ConfigReloader(
const std::string & preprocessed_dir_,
zkutil::ZooKeeperNodeCache && zk_node_cache_,
const zkutil::EventPtr & zk_changed_event_,
Updater && updater_,
bool already_loaded)
Updater && updater_)
: config_path(config_path_)
, extra_paths(extra_paths_)
, preprocessed_dir(preprocessed_dir_)
@ -28,10 +27,15 @@ ConfigReloader::ConfigReloader(
, zk_changed_event(zk_changed_event_)
, updater(std::move(updater_))
{
if (!already_loaded)
reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true, /* initial_loading = */ true);
}
auto config = reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true, /* initial_loading = */ true);
if (config.has_value())
reload_interval = std::chrono::milliseconds(config->configuration->getInt64("config_reload_interval_ms", DEFAULT_RELOAD_INTERVAL.count()));
else
reload_interval = DEFAULT_RELOAD_INTERVAL;
LOG_TRACE(log, "Config reload interval set to {}ms", reload_interval.count());
}
void ConfigReloader::start()
{
@ -82,7 +86,17 @@ void ConfigReloader::run()
if (quit)
return;
reloadIfNewer(zk_changed, /* throw_on_error = */ false, /* fallback_to_preprocessed = */ false, /* initial_loading = */ false);
auto config = reloadIfNewer(zk_changed, /* throw_on_error = */ false, /* fallback_to_preprocessed = */ false, /* initial_loading = */ false);
if (config.has_value())
{
auto new_reload_interval = std::chrono::milliseconds(config->configuration->getInt64("config_reload_interval_ms", DEFAULT_RELOAD_INTERVAL.count()));
if (new_reload_interval != reload_interval)
{
reload_interval = new_reload_interval;
LOG_TRACE(log, "Config reload interval changed to {}ms", reload_interval.count());
}
}
}
catch (...)
{
@ -92,7 +106,7 @@ void ConfigReloader::run()
}
}
void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading)
std::optional<ConfigProcessor::LoadedConfig> ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading)
{
std::lock_guard lock(reload_mutex);
@ -120,7 +134,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
throw;
tryLogCurrentException(log, "ZooKeeper error when loading config from '" + config_path + "'");
return;
return std::nullopt;
}
catch (...)
{
@ -128,7 +142,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
throw;
tryLogCurrentException(log, "Error loading config from '" + config_path + "'");
return;
return std::nullopt;
}
config_processor.savePreprocessedConfig(loaded_config, preprocessed_dir);
@ -154,11 +168,13 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
if (throw_on_error)
throw;
tryLogCurrentException(log, "Error updating configuration from '" + config_path + "' config.");
return;
return std::nullopt;
}
LOG_DEBUG(log, "Loaded config '{}', performed update on configuration", config_path);
return loaded_config;
}
return std::nullopt;
}
struct ConfigReloader::FileWithTimestamp

View File

@ -17,8 +17,6 @@ namespace Poco { class Logger; }
namespace DB
{
class Context;
/** Every two seconds checks configuration files for update.
* If configuration is changed, then config will be reloaded by ConfigProcessor
* and the reloaded config will be applied via Updater functor.
@ -27,6 +25,8 @@ class Context;
class ConfigReloader
{
public:
static constexpr auto DEFAULT_RELOAD_INTERVAL = std::chrono::milliseconds(2000);
using Updater = std::function<void(ConfigurationPtr, bool)>;
ConfigReloader(
@ -35,8 +35,7 @@ public:
const std::string & preprocessed_dir,
zkutil::ZooKeeperNodeCache && zk_node_cache,
const zkutil::EventPtr & zk_changed_event,
Updater && updater,
bool already_loaded);
Updater && updater);
~ConfigReloader();
@ -53,7 +52,7 @@ public:
private:
void run();
void reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading);
std::optional<ConfigProcessor::LoadedConfig> reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading);
struct FileWithTimestamp;
@ -67,8 +66,6 @@ private:
FilesChangesTracker getNewFileList() const;
static constexpr auto reload_interval = std::chrono::seconds(2);
LoggerPtr log = getLogger("ConfigReloader");
std::string config_path;
@ -85,6 +82,8 @@ private:
std::atomic<bool> quit{false};
ThreadFromGlobalPool thread;
std::chrono::milliseconds reload_interval = DEFAULT_RELOAD_INTERVAL;
/// Locked inside reloadIfNewer.
std::mutex reload_mutex;
};

View File

@ -154,6 +154,7 @@ namespace DB
M(String, merge_workload, "default", "Name of workload to be used to access resources for all merges (may be overridden by a merge tree setting)", 0) \
M(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overridden by a merge tree setting)", 0) \
M(Double, gwp_asan_force_sample_probability, 0, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -355,6 +355,8 @@ public:
{
return delegate->getS3StorageClient();
}
std::shared_ptr<const S3::Client> tryGetS3StorageClient() const override { return delegate->tryGetS3StorageClient(); }
#endif
private:

View File

@ -478,6 +478,8 @@ public:
"Method getS3StorageClient() is not implemented for disk type: {}",
getDataSourceDescription().toString());
}
virtual std::shared_ptr<const S3::Client> tryGetS3StorageClient() const { return nullptr; }
#endif

View File

@ -138,6 +138,11 @@ public:
{
return object_storage->getS3StorageClient();
}
std::shared_ptr<const S3::Client> tryGetS3StorageClient() override
{
return object_storage->tryGetS3StorageClient();
}
#endif
private:

View File

@ -587,6 +587,11 @@ std::shared_ptr<const S3::Client> DiskObjectStorage::getS3StorageClient() const
{
return object_storage->getS3StorageClient();
}
std::shared_ptr<const S3::Client> DiskObjectStorage::tryGetS3StorageClient() const
{
return object_storage->tryGetS3StorageClient();
}
#endif
DiskPtr DiskObjectStorageReservation::getDisk(size_t i) const

View File

@ -214,6 +214,7 @@ public:
#if USE_AWS_S3
std::shared_ptr<const S3::Client> getS3StorageClient() const override;
std::shared_ptr<const S3::Client> tryGetS3StorageClient() const override;
#endif
private:

View File

@ -127,8 +127,10 @@ public:
/// /, /a, /a/b, /a/b/c, /a/b/c/d while exists will return true only for /a/b/c/d
virtual bool existsOrHasAnyChild(const std::string & path) const;
/// List objects recursively by certain prefix.
virtual void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const;
/// List objects recursively by certain prefix. Use it instead of listObjects, if you want to list objects lazily.
virtual ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const;
/// Get object metadata if supported. It should be possible to receive
@ -269,6 +271,7 @@ public:
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "This function is only implemented for S3ObjectStorage");
}
virtual std::shared_ptr<const S3::Client> tryGetS3StorageClient() { return nullptr; }
#endif

View File

@ -634,6 +634,10 @@ std::shared_ptr<const S3::Client> S3ObjectStorage::getS3StorageClient()
return client.get();
}
std::shared_ptr<const S3::Client> S3ObjectStorage::tryGetS3StorageClient()
{
return client.get();
}
}
#endif

View File

@ -169,6 +169,7 @@ public:
bool isReadOnly() const override { return s3_settings.get()->read_only; }
std::shared_ptr<const S3::Client> getS3StorageClient() override;
std::shared_ptr<const S3::Client> tryGetS3StorageClient() override;
private:
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);

View File

@ -6,7 +6,7 @@ namespace DB
static constexpr int FILECACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 32 * 1024 * 1024; /// 32Mi
static constexpr int FILECACHE_DEFAULT_FILE_SEGMENT_ALIGNMENT = 4 * 1024 * 1024; /// 4Mi
static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS = 5;
static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS = 0;
static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_QUEUE_SIZE_LIMIT = 5000;
static constexpr int FILECACHE_DEFAULT_LOAD_METADATA_THREADS = 16;
static constexpr int FILECACHE_DEFAULT_MAX_ELEMENTS = 10000000;

View File

@ -91,6 +91,7 @@
#include <Common/StackTrace.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/Config/ConfigReloader.h>
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
@ -367,6 +368,9 @@ struct ContextSharedPart : boost::noncopyable
std::atomic_size_t max_view_num_to_warn = 10000lu;
std::atomic_size_t max_dictionary_num_to_warn = 1000lu;
std::atomic_size_t max_part_num_to_warn = 100000lu;
/// Only for system.server_settings, actually value stored in reloader itself
std::atomic_size_t config_reload_interval_ms = ConfigReloader::DEFAULT_RELOAD_INTERVAL.count();
String format_schema_path; /// Path to a directory that contains schema files used by input formats.
String google_protos_path; /// Path to a directory that contains the proto files for the well-known Protobuf types.
mutable OnceFlag action_locks_manager_initialized;
@ -4503,6 +4507,16 @@ void Context::checkPartitionCanBeDropped(const String & database, const String &
checkCanBeDropped(database, table, partition_size, max_partition_size_to_drop);
}
void Context::setConfigReloaderInterval(size_t value_ms)
{
shared->config_reload_interval_ms.store(value_ms, std::memory_order_relaxed);
}
size_t Context::getConfigReloaderInterval() const
{
return shared->config_reload_interval_ms.load(std::memory_order_relaxed);
}
InputFormatPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size, const std::optional<FormatSettings> & format_settings, std::optional<size_t> max_parsing_threads) const
{
return FormatFactory::instance().getInput(name, buf, sample, shared_from_this(), max_block_size, format_settings, max_parsing_threads);

View File

@ -1161,6 +1161,9 @@ public:
size_t getMaxPartitionSizeToDrop() const;
void checkPartitionCanBeDropped(const String & database, const String & table, const size_t & partition_size) const;
void checkPartitionCanBeDropped(const String & database, const String & table, const size_t & partition_size, const size_t & max_partition_size_to_drop) const;
/// Only for system.server_settings, actual value is stored in ConfigReloader
void setConfigReloaderInterval(size_t value_ms);
size_t getConfigReloaderInterval() const;
/// Lets you select the compression codec according to the conditions described in the configuration file.
std::shared_ptr<ICompressionCodec> chooseCompressionCodec(size_t part_size, double part_size_ratio) const;

View File

@ -626,11 +626,16 @@ BlockIO InterpreterInsertQuery::execute()
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
size_t threads = presink_chains.size();
pipeline.resize(1);
pipeline.addTransform(std::make_shared<PlanSquashingTransform>(
header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL,
presink_chains.size()));
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
pipeline.resize(threads);
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
@ -700,8 +705,7 @@ BlockIO InterpreterInsertQuery::execute()
auto balancing = std::make_shared<PlanSquashingTransform>(
chain.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL,
presink_chains.size());
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
chain.addSource(std::move(balancing));
}

View File

@ -233,29 +233,22 @@ void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint curr
}
#if USE_AWS_S3
try
if (auto s3_client = disk->tryGetS3StorageClient())
{
if (auto s3_client = disk->getS3StorageClient())
if (auto put_throttler = s3_client->getPutRequestThrottler())
{
if (auto put_throttler = s3_client->getPutRequestThrottler())
{
new_values[fmt::format("DiskPutObjectThrottlerRPS_{}", name)] = { put_throttler->getMaxSpeed(),
"PutObject Request throttling limit on the disk in requests per second (virtual filesystem). Local filesystems may not provide this information." };
new_values[fmt::format("DiskPutObjectThrottlerAvailable_{}", name)] = { put_throttler->getAvailable(),
"Number of PutObject requests that can be currently issued without hitting throttling limit on the disk (virtual filesystem). Local filesystems may not provide this information." };
}
if (auto get_throttler = s3_client->getGetRequestThrottler())
{
new_values[fmt::format("DiskGetObjectThrottlerRPS_{}", name)] = { get_throttler->getMaxSpeed(),
"GetObject Request throttling limit on the disk in requests per second (virtual filesystem). Local filesystems may not provide this information." };
new_values[fmt::format("DiskGetObjectThrottlerAvailable_{}", name)] = { get_throttler->getAvailable(),
"Number of GetObject requests that can be currently issued without hitting throttling limit on the disk (virtual filesystem). Local filesystems may not provide this information." };
}
new_values[fmt::format("DiskPutObjectThrottlerRPS_{}", name)] = { put_throttler->getMaxSpeed(),
"PutObject Request throttling limit on the disk in requests per second (virtual filesystem). Local filesystems may not provide this information." };
new_values[fmt::format("DiskPutObjectThrottlerAvailable_{}", name)] = { put_throttler->getAvailable(),
"Number of PutObject requests that can be currently issued without hitting throttling limit on the disk (virtual filesystem). Local filesystems may not provide this information." };
}
if (auto get_throttler = s3_client->getGetRequestThrottler())
{
new_values[fmt::format("DiskGetObjectThrottlerRPS_{}", name)] = { get_throttler->getMaxSpeed(),
"GetObject Request throttling limit on the disk in requests per second (virtual filesystem). Local filesystems may not provide this information." };
new_values[fmt::format("DiskGetObjectThrottlerAvailable_{}", name)] = { get_throttler->getAvailable(),
"Number of GetObject requests that can be currently issued without hitting throttling limit on the disk (virtual filesystem). Local filesystems may not provide this information." };
}
}
catch (...) // NOLINT(bugprone-empty-catch)
{
// Skip disk that do not have s3 throttlers
}
#endif
}

View File

@ -1230,7 +1230,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
bool no_merging_final = do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
data.merging_params.is_deleted_column.empty();
data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
if (no_merging_final)
{
@ -1265,7 +1265,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
/// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
bool split_parts_ranges_into_intersecting_and_non_intersecting_final = settings.split_parts_ranges_into_intersecting_and_non_intersecting_final &&
data.merging_params.is_deleted_column.empty();
data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
metadata_for_reading->getPrimaryKey(),

View File

@ -1,5 +1,4 @@
#include <Processors/Transforms/PlanSquashingTransform.h>
#include <Processors/IProcessor.h>
#include <Common/Exception.h>
namespace DB
@ -10,136 +9,36 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
PlanSquashingTransform::PlanSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), squashing(header, min_block_size_rows, min_block_size_bytes)
PlanSquashingTransform::PlanSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: IInflatingTransform(header, header), squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
IProcessor::Status PlanSquashingTransform::prepare()
void PlanSquashingTransform::consume(Chunk chunk)
{
Status status = Status::Ready;
while (planning_status != PlanningStatus::FINISH)
{
switch (planning_status)
{
case INIT:
init();
break;
case READ_IF_CAN:
return prepareConsume();
case PUSH:
return sendOrFlush();
case FLUSH:
return sendOrFlush();
case FINISH:
break; /// never reached
}
}
if (status == Status::Ready)
status = finish();
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "There should be a Ready status to finish the PlanSquashing");
return status;
if (Chunk current_chunk = squashing.add(std::move(chunk)); current_chunk.hasChunkInfo())
squashed_chunk.swap(current_chunk);
}
void PlanSquashingTransform::work()
Chunk PlanSquashingTransform::generate()
{
prepare();
if (!squashed_chunk.hasChunkInfo())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in SimpleSquashingChunksTransform");
Chunk result_chunk;
result_chunk.swap(squashed_chunk);
return result_chunk;
}
void PlanSquashingTransform::init()
bool PlanSquashingTransform::canGenerate()
{
for (auto input: inputs)
if (!input.isFinished())
input.setNeeded();
planning_status = PlanningStatus::READ_IF_CAN;
return squashed_chunk.hasChunkInfo();
}
IProcessor::Status PlanSquashingTransform::prepareConsume()
Chunk PlanSquashingTransform::getRemaining()
{
bool all_finished = true;
for (auto & input : inputs)
{
if (!input.isFinished())
{
all_finished = false;
input.setNeeded();
}
else
continue;
if (input.hasData())
{
chunk = input.pull();
chunk = transform(std::move(chunk));
if (chunk.hasChunkInfo())
{
planning_status = PlanningStatus::PUSH;
return Status::Ready;
}
}
}
if (all_finished) /// If all inputs are closed, we check if we have data in balancing
{
if (squashing.isDataLeft()) /// If we have data in balancing, we process this data
{
planning_status = PlanningStatus::FLUSH;
chunk = flushChunk();
return Status::Ready;
}
planning_status = PlanningStatus::FINISH;
return Status::Ready;
}
return Status::NeedData;
}
Chunk PlanSquashingTransform::transform(Chunk && chunk_)
{
return squashing.add(std::move(chunk_));
}
Chunk PlanSquashingTransform::flushChunk()
{
return squashing.flush();
}
IProcessor::Status PlanSquashingTransform::sendOrFlush()
{
if (!chunk)
{
planning_status = PlanningStatus::FINISH;
return Status::Ready;
}
for (auto &output : outputs)
{
if (output.canPush())
{
if (planning_status == PlanningStatus::PUSH)
planning_status = PlanningStatus::READ_IF_CAN;
else
planning_status = PlanningStatus::FINISH;
output.push(std::move(chunk));
return Status::Ready;
}
}
return Status::PortFull;
}
IProcessor::Status PlanSquashingTransform::finish()
{
for (auto & in : inputs)
in.close();
for (auto & output : outputs)
output.finish();
return Status::Finished;
Chunk current_chunk = squashing.flush();
return current_chunk;
}
}

View File

@ -1,47 +1,29 @@
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/IProcessor.h>
#include <Interpreters/Squashing.h>
enum PlanningStatus
{
INIT,
READ_IF_CAN,
PUSH,
FLUSH,
FINISH
};
#include <Processors/IInflatingTransform.h>
namespace DB
{
class PlanSquashingTransform : public IProcessor
class PlanSquashingTransform : public IInflatingTransform
{
public:
PlanSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports);
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "PlanSquashingTransform"; }
InputPorts & getInputPorts() { return inputs; }
OutputPorts & getOutputPorts() { return outputs; }
Status prepare() override;
void work() override;
void init();
Status prepareConsume();
Status sendOrFlush();
Status waitForDataIn();
Status finish();
Chunk transform(Chunk && chunk);
Chunk flushChunk();
protected:
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
Chunk getRemaining() override;
private:
Chunk chunk;
Squashing squashing;
PlanningStatus planning_status = PlanningStatus::INIT;
Chunk squashed_chunk;
Chunk finish_chunk;
};
}

View File

@ -125,6 +125,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int ALL_CONNECTION_TRIES_FAILED;
}
class ParallelReplicasReadingCoordinator::ImplInterface
@ -1025,7 +1026,11 @@ void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica
std::lock_guard lock(mutex);
if (!pimpl)
{
unavailable_nodes_registered_before_initialization.push_back(replica_number);
if (unavailable_nodes_registered_before_initialization.size() == replicas_count)
throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Can't connect to any replica chosen for query execution");
}
else
pimpl->markReplicaAsUnavailable(replica_number);
}

View File

@ -34,7 +34,7 @@ private:
void initialize(CoordinationMode mode);
std::mutex mutex;
size_t replicas_count{0};
const size_t replicas_count{0};
size_t mark_segment_size{0};
std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation

View File

@ -308,7 +308,7 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
// Generate aggregated blocks with rows less or equal than the original block.
// There should be only one output block after this transformation.
builder.addTransform(std::make_shared<PlanSquashingTransform>(builder.getHeader(), block.rows(), 0, 1));
builder.addTransform(std::make_shared<PlanSquashingTransform>(builder.getHeader(), block.rows(), 0));
builder.addTransform(std::make_shared<ApplySquashingTransform>(builder.getHeader(), block.rows(), 0));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));

View File

@ -26,6 +26,7 @@
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Formats/FormatSettings.h>
namespace DB
@ -167,7 +168,7 @@ std::pair<ColumnString::Ptr, ColumnString::Ptr> EmbeddedRocksDBBulkSink::seriali
auto & serialized_value_offsets = serialized_value_column->getOffsets();
WriteBufferFromVector<ColumnString::Chars> writer_key(serialized_key_data);
WriteBufferFromVector<ColumnString::Chars> writer_value(serialized_value_data);
FormatSettings format_settings; /// Format settings is 1.5KB, so it's not wise to create it for each row
for (auto && chunk : input_chunks)
{
const auto & columns = chunk.getColumns();
@ -175,7 +176,7 @@ std::pair<ColumnString::Ptr, ColumnString::Ptr> EmbeddedRocksDBBulkSink::seriali
for (size_t i = 0; i < rows; ++i)
{
for (size_t idx = 0; idx < columns.size(); ++idx)
serializations[idx]->serializeBinary(*columns[idx], i, idx == primary_key_pos ? writer_key : writer_value, {});
serializations[idx]->serializeBinary(*columns[idx], i, idx == primary_key_pos ? writer_key : writer_value, format_settings);
/// String in ColumnString must be null-terminated
writeChar('\0', writer_key);
writeChar('\0', writer_value);

View File

@ -6,6 +6,7 @@
#include <IO/MMappedFileCache.h>
#include <IO/UncompressedCache.h>
#include <Interpreters/Context.h>
#include <Common/Config/ConfigReloader.h>
#include <Interpreters/ProcessList.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
@ -84,7 +85,8 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context
{"mmap_cache_size", {std::to_string(context->getMMappedFileCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}},
{"merge_workload", {context->getMergeWorkload(), ChangeableWithoutRestart::Yes}},
{"mutation_workload", {context->getMutationWorkload(), ChangeableWithoutRestart::Yes}}
{"mutation_workload", {context->getMutationWorkload(), ChangeableWithoutRestart::Yes}},
{"config_reload_interval_ms", {std::to_string(context->getConfigReloaderInterval()), ChangeableWithoutRestart::Yes}}
};
if (context->areBackgroundExecutorsInitialized())

View File

@ -1,13 +1,4 @@
00725_memory_tracking
01624_soft_constraints
02354_vector_search_queries
02901_parallel_replicas_rollup
02999_scalar_subqueries_bug_2
# Flaky list
01825_type_json_in_array
01414_mutations_and_errors_zookeeper
01287_max_execution_speed
# Check after ConstantNode refactoring
02154_parser_backtracking
02944_variant_as_common_type
02942_variant_cast

View File

@ -1065,6 +1065,8 @@ def main() -> int:
)
# rerun helper check
# FIXME: Find a way to identify if job restarted manually (by developer) or by automatic workflow restart (died spot-instance)
# disable rerun check for the former
if check_name not in (
CI.JobNames.BUILD_CHECK,
): # we might want to rerun build report job
@ -1076,8 +1078,7 @@ def main() -> int:
print("::group::Commit Status")
print(status)
print("::endgroup::")
# FIXME: try rerun, even if status is present. To enable manual restart via GH interface
# previous_status = status.state
previous_status = status.state
# ci cache check
if not previous_status and not ci_settings.no_ci_cache:

View File

@ -122,6 +122,10 @@ def _get_statless_tests_to_run(pr_info: PRInfo) -> List[str]:
for fpath in pr_info.changed_files:
if re.match(r"tests/queries/0_stateless/[0-9]{5}", fpath):
path_ = Path(REPO_COPY + "/" + fpath)
if not path_.exists():
logging.info("File '%s' is removed - skip", fpath)
continue
logging.info("File '%s' is changed and seems like a test", fpath)
fname = fpath.split("/")[3]
fname_without_ext = os.path.splitext(fname)[0]

View File

@ -63,7 +63,10 @@ def get_access_token_by_key_app(private_key: str, app_id: int) -> str:
"iss": app_id,
}
encoded_jwt = jwt.encode(payload, private_key, algorithm="RS256")
# FIXME: apparently should be switched to this so that mypy is happy
# jwt_instance = JWT()
# encoded_jwt = jwt_instance.encode(payload, private_key, algorithm="RS256")
encoded_jwt = jwt.encode(payload, private_key, algorithm="RS256") # type: ignore
installation_id = get_installation_id(encoded_jwt)
return get_access_token_by_jwt(encoded_jwt, installation_id)

View File

@ -13,7 +13,7 @@ from typing import List, Tuple, Union
import magic
from docker_images_helper import get_docker_image, pull_image
from env_helper import IS_CI, REPO_COPY, TEMP_PATH
from env_helper import IS_CI, REPO_COPY, TEMP_PATH, GITHUB_EVENT_PATH
from git_helper import GIT_PREFIX, git_runner
from pr_info import PRInfo
from report import ERROR, FAILURE, SUCCESS, JobReport, TestResults, read_test_results
@ -216,7 +216,8 @@ def main():
status=state,
start_time=stopwatch.start_time_str,
duration=stopwatch.duration_seconds,
additional_files=additional_files,
# add GITHUB_EVENT_PATH json file to have it in style check report. sometimes it's needed for debugging.
additional_files=additional_files + [Path(GITHUB_EVENT_PATH)],
).dump()
if state in [ERROR, FAILURE]:

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<config_reload_interval_ms>1000</config_reload_interval_ms>
</clickhouse>

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
import pytest
import fnmatch
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/config_reloader.xml"],
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_reload_config(start_cluster):
assert node.wait_for_log_line(
f"Config reload interval set to 1000ms", look_behind_lines=2000
)
assert (
node.query(
"SELECT value from system.server_settings where name = 'config_reload_interval_ms'"
)
== "1000\n"
)
node.replace_in_config(
"/etc/clickhouse-server/config.d/config_reloader.xml",
"1000",
"7777",
)
assert node.wait_for_log_line(
f"Config reload interval changed to 7777ms", look_behind_lines=2000
)
assert (
node.query(
"SELECT value from system.server_settings where name = 'config_reload_interval_ms'"
)
== "7777\n"
)

View File

@ -0,0 +1,31 @@
<clickhouse>
<remote_servers>
<test_1_shard_1_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>initiator</host>
<port>9000</port>
</replica>
</shard>
</test_1_shard_1_replicas>
<test_1_shard_3_unavaliable_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_1_shard_3_unavaliable_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,50 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
initiator = cluster.add_instance(
"initiator", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(cluster, table_name):
initiator.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
initiator.query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
# populate data
initiator.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000)"
)
@pytest.mark.parametrize("skip_unavailable_shards", [1, 0])
def test_skip_all_replicas(start_cluster, skip_unavailable_shards):
cluster_name = "test_1_shard_3_unavaliable_replicas"
table_name = "tt"
create_tables(cluster_name, table_name)
with pytest.raises(QueryRuntimeException):
initiator.query(
f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"cluster_for_parallel_replicas": cluster_name,
"skip_unavailable_shards": skip_unavailable_shards,
},
)

View File

@ -33,6 +33,7 @@
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>{id}</server_id>
<digest_enabled>1</digest_enabled>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -11,6 +11,7 @@
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<digest_enabled>1</digest_enabled>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -1,4 +1,6 @@
#!/usr/bin/env bash
# Tags: no-tsan
# ^ TSan uses more stack
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,2 +1,2 @@
1
102400 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/02344_describe_cache_test 5 5000 0 16
102400 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/02344_describe_cache_test 0 5000 0 16

View File

@ -14,6 +14,7 @@ SETTINGS disk = disk(type = cache,
max_size = '1Gi',
max_file_segment_size = '40Mi',
boundary_alignment = '20Mi',
background_download_threads = 2,
path = '$CLICKHOUSE_TEST_UNIQUE_NAME',
disk = 's3_disk');

View File

@ -1,2 +1,2 @@
1048576 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection_sql 5 5000 0 16
1048576 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection 5 5000 0 16
1048576 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection_sql 0 5000 0 16
1048576 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection 0 5000 0 16

View File

@ -1,20 +1,20 @@
100 10 10 10 0 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 5 5000 0 16
100 10 10 10 0 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
0
10
98
set max_size from 100 to 10
10 10 10 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 5 5000 0 16
10 10 10 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
1
8
set max_size from 10 to 100
100 10 10 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 5 5000 0 16
100 10 10 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
10
98
set max_elements from 10 to 2
100 2 10 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 5 5000 0 16
100 2 10 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
2
18
set max_elements from 2 to 10
100 10 10 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 5 5000 0 16
100 10 10 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
10
98

View File

@ -0,0 +1,19 @@
0 95 abc [1,2,3]
1 \N a [993,986,979,972]
2 123 makson95 []
95123
abcamakson95
[1,2,3][993,986,979,972][]
[1,2,3]
abcamakson95
95123
95\n123
95,123
abc,a,makson95
[1,2,3],[993,986,979,972]
\N
951239512395123
abc,a,makson95,abc,a,makson95,abc,a,makson95
[1,2,3][993,986,979,972][][1,2,3][993,986,979,972][][1,2,3][993,986,979,972][]
488890
488890

View File

@ -0,0 +1,57 @@
DROP TABLE IF EXISTS test_groupConcat;
CREATE TABLE test_groupConcat
(
id UInt64,
p_int Int32 NULL,
p_string String,
p_array Array(Int32)
) ENGINE = MergeTree ORDER BY id;
SET max_insert_threads = 1, max_threads = 1, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 0;
INSERT INTO test_groupConcat VALUES (0, 95, 'abc', [1, 2, 3]), (1, NULL, 'a', [993, 986, 979, 972]), (2, 123, 'makson95', []);
SELECT * FROM test_groupConcat;
SELECT groupConcat(p_int) FROM test_groupConcat;
SELECT groupConcat(p_string) FROM test_groupConcat;
SELECT groupConcat(p_array) FROM test_groupConcat;
SELECT groupConcat('', 1)(p_array) FROM test_groupConcat;
SELECT groupConcat('', 3)(p_string) FROM test_groupConcat;
SELECT groupConcat('', 2)(p_int) FROM test_groupConcat;
SELECT groupConcat('\n', 3)(p_int) FROM test_groupConcat;
SELECT groupConcat(',')(p_int) FROM test_groupConcat;
SELECT groupConcat(',')(p_string) FROM test_groupConcat;
SELECT groupConcat(',', 2)(p_array) FROM test_groupConcat;
SELECT groupConcat(p_int) FROM test_groupConcat WHERE id = 1;
INSERT INTO test_groupConcat VALUES (0, 95, 'abc', [1, 2, 3]), (1, NULL, 'a', [993, 986, 979, 972]), (2, 123, 'makson95', []);
INSERT INTO test_groupConcat VALUES (0, 95, 'abc', [1, 2, 3]), (1, NULL, 'a', [993, 986, 979, 972]), (2, 123, 'makson95', []);
SELECT groupConcat(p_int) FROM test_groupConcat;
SELECT groupConcat(',')(p_string) FROM test_groupConcat;
SELECT groupConcat(p_array) FROM test_groupConcat;
SELECT groupConcat(123)(number) FROM numbers(10); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT groupConcat(',', '3')(number) FROM numbers(10); -- { serverError BAD_ARGUMENTS }
SELECT groupConcat(',', 0)(number) FROM numbers(10); -- { serverError BAD_ARGUMENTS }
SELECT groupConcat(',', -1)(number) FROM numbers(10); -- { serverError BAD_ARGUMENTS }
SELECT groupConcat(',', 3, 3)(number) FROM numbers(10); -- { serverError TOO_MANY_ARGUMENTS_FOR_FUNCTION }
SELECT length(groupConcat(number)) FROM numbers(100000);
DROP TABLE IF EXISTS test_groupConcat;
CREATE TABLE test_groupConcat
(
id UInt64,
p_int Int32,
) ENGINE = MergeTree ORDER BY id;
INSERT INTO test_groupConcat SELECT number, number FROM numbers(100000) SETTINGS min_insert_block_size_rows = 2000;
SELECT length(groupConcat(p_int)) FROM test_groupConcat;
DROP TABLE IF EXISTS test_groupConcat;

View File

@ -0,0 +1,116 @@
2000-01-01 00:00:00 3732436800 3732436800 0
2000-01-02 00:00:00 11197396800 11197396800 0
2000-01-03 00:00:00 18662356800 18662356800 0
2000-01-04 00:00:00 26127316800 26127316800 0
2000-01-05 00:00:00 33592276800 33592276800 0
2000-01-06 00:00:00 41057236800 41057236800 0
2000-01-07 00:00:00 48522196800 48522196800 0
2000-01-08 00:00:00 55987156800 55987156800 0
2000-01-09 00:00:00 63452116800 63452116800 0
2000-01-10 00:00:00 70917076800 70917076800 0
2000-01-11 00:00:00 78382036800 78382036800 0
2000-01-12 00:00:00 85846996800 85846996800 0
2000-01-13 00:00:00 93311956800 93311956800 0
2000-01-14 00:00:00 100776916800 100776916800 0
2000-01-15 00:00:00 108241876800 108241876800 0
2000-01-16 00:00:00 115706836800 115706836800 0
2000-01-17 00:00:00 123171796800 123171796800 0
2000-01-18 00:00:00 130636756800 130636756800 0
2000-01-19 00:00:00 138101716800 138101716800 0
2000-01-20 00:00:00 145566676800 145566676800 0
2000-01-21 00:00:00 153031636800 153031636800 0
2000-01-22 00:00:00 160496596800 160496596800 0
2000-01-23 00:00:00 167961556800 167961556800 0
2000-01-24 00:00:00 175426516800 175426516800 0
2000-01-25 00:00:00 182891476800 182891476800 0
2000-01-26 00:00:00 190356436800 190356436800 0
2000-01-27 00:00:00 197821396800 197821396800 0
2000-01-28 00:00:00 205286356800 205286356800 0
2000-01-29 00:00:00 212751316800 212751316800 0
2000-01-30 00:00:00 220216276800 220216276800 0
2000-01-31 00:00:00 227681236800 227681236800 0
2000-02-01 00:00:00 235146196800 235146196800 0
2000-02-02 00:00:00 242611156800 242611156800 0
2000-02-03 00:00:00 250076116800 250076116800 0
2000-02-04 00:00:00 257541076800 257541076800 0
2000-02-05 00:00:00 265006036800 265006036800 0
2000-02-06 00:00:00 272470996800 272470996800 0
2000-02-07 00:00:00 279935956800 279935956800 0
2000-02-08 00:00:00 287400916800 287400916800 0
2000-02-09 00:00:00 294865876800 294865876800 0
2000-02-10 00:00:00 302330836800 302330836800 0
2000-02-11 00:00:00 309795796800 309795796800 0
2000-02-12 00:00:00 317260756800 317260756800 0
2000-02-13 00:00:00 324725716800 324725716800 0
2000-02-14 00:00:00 332190676800 332190676800 0
2000-02-15 00:00:00 339655636800 339655636800 0
2000-02-16 00:00:00 347120596800 347120596800 0
2000-02-17 00:00:00 354585556800 354585556800 0
2000-02-18 00:00:00 362050516800 362050516800 0
2000-02-19 00:00:00 369515476800 369515476800 0
2000-02-20 00:00:00 376980436800 376980436800 0
2000-02-21 00:00:00 384445396800 384445396800 0
2000-02-22 00:00:00 391910356800 391910356800 0
2000-02-23 00:00:00 399375316800 399375316800 0
2000-02-24 00:00:00 406840276800 406840276800 0
2000-02-25 00:00:00 414305236800 414305236800 0
2000-02-26 00:00:00 421770196800 421770196800 0
2000-02-27 00:00:00 429235156800 429235156800 0
2000-02-28 00:00:00 436700116800 436700116800 0
2000-02-29 00:00:00 444165076800 444165076800 0
2000-03-01 00:00:00 451630036800 451630036800 0
2000-03-02 00:00:00 459094996800 459094996800 0
2000-03-03 00:00:00 466559956800 466559956800 0
2000-03-04 00:00:00 474024916800 474024916800 0
2000-03-05 00:00:00 481489876800 481489876800 0
2000-03-06 00:00:00 488954836800 488954836800 0
2000-03-07 00:00:00 496419796800 496419796800 0
2000-03-08 00:00:00 503884756800 503884756800 0
2000-03-09 00:00:00 511349716800 511349716800 0
2000-03-10 00:00:00 518814676800 518814676800 0
2000-03-11 00:00:00 526279636800 526279636800 0
2000-03-12 00:00:00 533744596800 533744596800 0
2000-03-13 00:00:00 541209556800 541209556800 0
2000-03-14 00:00:00 548674516800 548674516800 0
2000-03-15 00:00:00 556139476800 556139476800 0
2000-03-16 00:00:00 563604436800 563604436800 0
2000-03-17 00:00:00 571069396800 571069396800 0
2000-03-18 00:00:00 578534356800 578534356800 0
2000-03-19 00:00:00 585999316800 585999316800 0
2000-03-20 00:00:00 593464276800 593464276800 0
2000-03-21 00:00:00 600929236800 600929236800 0
2000-03-22 00:00:00 608394196800 608394196800 0
2000-03-23 00:00:00 615859156800 615859156800 0
2000-03-24 00:00:00 623324116800 623324116800 0
2000-03-25 00:00:00 630789076800 630789076800 0
2000-03-26 00:00:00 638254036800 638254036800 0
2000-03-27 00:00:00 645718996800 645718996800 0
2000-03-28 00:00:00 653183956800 653183956800 0
2000-03-29 00:00:00 660648916800 660648916800 0
2000-03-30 00:00:00 668113876800 668113876800 0
2000-03-31 00:00:00 675578836800 675578836800 0
2000-04-01 00:00:00 683043796800 683043796800 0
2000-04-02 00:00:00 690508756800 690508756800 0
2000-04-03 00:00:00 697973716800 697973716800 0
2000-04-04 00:00:00 705438676800 705438676800 0
2000-04-05 00:00:00 712903636800 712903636800 0
2000-04-06 00:00:00 720368596800 720368596800 0
2000-04-07 00:00:00 727833556800 727833556800 0
2000-04-08 00:00:00 735298516800 735298516800 0
2000-04-09 00:00:00 742763476800 742763476800 0
2000-04-10 00:00:00 750228436800 750228436800 0
2000-04-11 00:00:00 757693396800 757693396800 0
2000-04-12 00:00:00 765158356800 765158356800 0
2000-04-13 00:00:00 772623316800 772623316800 0
2000-04-14 00:00:00 780088276800 780088276800 0
2000-04-15 00:00:00 787553236800 787553236800 0
2000-04-16 00:00:00 795018196800 795018196800 0
2000-04-17 00:00:00 802483156800 802483156800 0
2000-04-18 00:00:00 809948116800 809948116800 0
2000-04-19 00:00:00 817413076800 817413076800 0
2000-04-20 00:00:00 824878036800 824878036800 0
2000-04-21 00:00:00 832342996800 832342996800 0
2000-04-22 00:00:00 839807956800 839807956800 0
2000-04-23 00:00:00 847272916800 847272916800 0
2000-04-24 00:00:00 854737876800 854737876800 0
2000-04-25 00:00:00 637951968000 862202836800 224250868800

View File

@ -0,0 +1,12 @@
-- Tags: no-tsan, no-asan, no-msan, no-fasttest
-- Test is slow
create table tab (x DateTime('UTC'), y UInt32, v Int32) engine = ReplacingMergeTree(v) order by x;
insert into tab select toDateTime('2000-01-01', 'UTC') + number, number, 1 from numbers(1e7);
optimize table tab final;
WITH (60 * 60) * 24 AS d
select toStartOfDay(x) as k, sum(y) as v,
(z + d) * (z + d - 1) / 2 - (toUInt64(k - toDateTime('2000-01-01', 'UTC')) as z) * (z - 1) / 2 as est,
est - v as delta
from tab final group by k order by k
settings max_threads=8, optimize_aggregation_in_order=1, split_parts_ranges_into_intersecting_and_non_intersecting_final=1;

View File

@ -0,0 +1,3 @@
First
First
Second

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS test_serialization;
CREATE TABLE test_serialization
(
id UInt64,
text AggregateFunction(groupConcat, String)
) ENGINE = AggregatingMergeTree() ORDER BY id;
INSERT INTO test_serialization SELECT
1,
groupConcatState('First');
SELECT groupConcatMerge(text) AS concatenated_text FROM test_serialization GROUP BY id;
INSERT INTO test_serialization SELECT
2,
groupConcatState('Second');
SELECT groupConcatMerge(text) AS concatenated_text FROM test_serialization GROUP BY id ORDER BY id;
DROP TABLE IF EXISTS test_serialization;

View File

@ -48,6 +48,7 @@ AutoML
Autocompletion
AvroConfluent
BIGINT
bigrams
BIGSERIAL
BORO
BSON
@ -1008,6 +1009,7 @@ UncompressedCacheBytes
UncompressedCacheCells
UnidirectionalEdgeIsValid
UniqThetaSketch
unigrams
Updatable
Uppercased
Uptime
@ -1507,9 +1509,11 @@ deserializing
destructor
destructors
detectCharset
detectTonality
detectLanguage
detectLanguageMixed
detectLanguageUnknown
detectProgrammingLanguage
determinator
deterministically
dictGet
@ -1526,6 +1530,7 @@ disableProtocols
disjunction
disjunctions
displaySecretsInShowAndSelect
displayName
distro
divideDecimal
dmesg