Merge branch 'keeper-batch-flushes' into keeper-parallel-storage

This commit is contained in:
Antonio Andelic 2023-09-08 16:26:12 +00:00
commit 9791a2ea40
206 changed files with 3902 additions and 1868 deletions

View File

@ -76,6 +76,7 @@ jobs:
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # to find ancestor merge commits necessary for finding proper docker tags
- name: Download changed aarch64 images
uses: actions/download-artifact@v3
with:

View File

@ -73,6 +73,7 @@ jobs:
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # to find ancestor merge commits necessary for finding proper docker tags
- name: Download changed aarch64 images
uses: actions/download-artifact@v3
with:

View File

@ -60,6 +60,7 @@ jobs:
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # to find ancestor merge commits necessary for finding proper docker tags
- name: Download changed aarch64 images
uses: actions/download-artifact@v3
with:

View File

@ -53,6 +53,7 @@ jobs:
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # to find ancestor merge commits necessary for finding proper docker tags
- name: Download changed aarch64 images
uses: actions/download-artifact@v3
with:

View File

@ -94,6 +94,7 @@ jobs:
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # to find ancestor merge commits necessary for finding proper docker tags
- name: Download changed aarch64 images
uses: actions/download-artifact@v3
with:

View File

@ -52,6 +52,7 @@ jobs:
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # to find ancestor merge commits necessary for finding proper docker tags
- name: Download changed aarch64 images
uses: actions/download-artifact@v3
with:

View File

@ -5,9 +5,9 @@ set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY)
set (CMAKE_SYSTEM_NAME "Linux")
set (CMAKE_SYSTEM_PROCESSOR "ppc64le")
set (CMAKE_C_COMPILER_TARGET "ppc64le-linux-gnu")
set (CMAKE_CXX_COMPILER_TARGET "ppc64le-linux-gnu")
set (CMAKE_ASM_COMPILER_TARGET "ppc64le-linux-gnu")
set (CMAKE_C_COMPILER_TARGET "powerpc64le-linux-gnu")
set (CMAKE_CXX_COMPILER_TARGET "powerpc64le-linux-gnu")
set (CMAKE_ASM_COMPILER_TARGET "powerpc64le-linux-gnu")
# Will be changed later, but somehow needed to be set here.
set (CMAKE_AR "ar")

View File

@ -126,7 +126,7 @@ if(ENABLE_OPENSSL_DYNAMIC OR ENABLE_OPENSSL)
elseif(ARCH_PPC64LE)
macro(perl_generate_asm FILE_IN FILE_OUT)
add_custom_command(OUTPUT ${FILE_OUT}
COMMAND /usr/bin/env perl ${FILE_IN} "linux64" ${FILE_OUT})
COMMAND /usr/bin/env perl ${FILE_IN} "linux64v2" ${FILE_OUT})
endmacro()
perl_generate_asm(${OPENSSL_SOURCE_DIR}/crypto/aes/asm/aes-ppc.pl ${OPENSSL_BINARY_DIR}/crypto/aes/aes-ppc.s)

View File

@ -30,7 +30,7 @@ It may lack support for new features.
## Usage {#cli_usage}
The client can be used in interactive and non-interactive (batch) mode.
The client can be used in interactive and non-interactive (batch) mode.
### Gather your connection details
<ConnectionDetails />
@ -177,8 +177,8 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--user, -u` The username. Default value: default.
- `--password` The password. Default value: empty string.
- `--ask-password` - Prompt the user to enter a password.
- `--query, -q` The query to process when using non-interactive mode. Cannot be used simultaneously with `--queries-file`.
- `--queries-file` file path with queries to execute. Cannot be used simultaneously with `--query`.
- `--query, -q` The query to process when using non-interactive mode. `--query` can be specified multiple times, e.g. `--query "SELECT 1" --query "SELECT 2"`. Cannot be used simultaneously with `--queries-file`.
- `--queries-file` file path with queries to execute. `--queries-file` can be specified multiple times, e.g. `--query queries1.sql --query queries2.sql`. Cannot be used simultaneously with `--query`.
- `--multiquery, -n` If specified, multiple queries separated by semicolons can be listed after the `--query` option. For convenience, it is also possible to omit `--query` and pass the queries directly after `--multiquery`.
- `--multiline, -m` If specified, allow multiline queries (do not send the query on Enter).
- `--database, -d` Select the current default database. Default value: the current database from the server settings (default by default).

View File

@ -11,6 +11,8 @@ ClickHouse runs sampling profiler that allows analyzing query execution. Using p
Query profiler is automatically enabled in ClickHouse Cloud and you can run a sample query as follows
:::note If you are running the following query in ClickHouse Cloud, make sure to change `FROM system.trace_log` to `FROM clusterAllReplicas(default, system.trace_log)` to select from all nodes of the cluster :::
``` sql
SELECT
count(),

View File

@ -4644,6 +4644,14 @@ SELECT toFloat64('1.7091'), toFloat64('1.5008753E7') SETTINGS precise_float_pars
└─────────────────────┴──────────────────────────┘
```
## partial_result_update_duration_ms
Interval (in milliseconds) for sending updates with partial data about the result table to the client (in interactive mode) during query execution. Setting to 0 disables partial results. Only supported for single-threaded GROUP BY without key, ORDER BY, LIMIT and OFFSET.
## max_rows_in_partial_result
Maximum rows to show in the partial result after every real-time update while the query runs (use partial result limit + OFFSET as a value in case of OFFSET in the query).
## validate_tcp_client_information {#validate-tcp-client-information}
Determines whether validation of client information enabled when query packet is received from a client using a TCP connection.

View File

@ -202,8 +202,8 @@ Arguments:
- `-S`, `--structure` — table structure for input data.
- `--input-format` — input format, `TSV` by default.
- `-f`, `--file` — path to data, `stdin` by default.
- `-q`, `--query` — queries to execute with `;` as delimiter. Cannot be used simultaneously with `--queries-file`.
- `--queries-file` - file path with queries to execute. Cannot be used simultaneously with `--query`.
- `-q`, `--query` — queries to execute with `;` as delimiter. `--query` can be specified multiple times, e.g. `--query "SELECT 1" --query "SELECT 2"`. Cannot be used simultaneously with `--queries-file`.
- `--queries-file` - file path with queries to execute. `--queries-file` can be specified multiple times, e.g. `--query queries1.sql --query queries2.sql`. Cannot be used simultaneously with `--query`.
- `--multiquery, -n` If specified, multiple queries separated by semicolons can be listed after the `--query` option. For convenience, it is also possible to omit `--query` and pass the queries directly after `--multiquery`.
- `-N`, `--table` — table name where to put output data, `table` by default.
- `--format`, `--output-format` — output format, `TSV` by default.

View File

@ -92,6 +92,50 @@ Result:
└───┘
```
## isZeroOrNull
Returns whether the argument is 0 (zero) or [NULL](../../sql-reference/syntax.md#null-literal).
``` sql
isZeroOrNull(x)
```
**Arguments:**
- `x` — A value of non-compound data type.
**Returned value**
- `1` if `x` is 0 (zero) or `NULL`.
- `0` else.
**Example**
Table:
``` text
┌─x─┬────y─┐
│ 1 │ ᴺᵁᴸᴸ │
│ 2 │ 0 │
│ 3 │ 3 │
└───┴──────┘
```
Query:
``` sql
SELECT x FROM t_null WHERE isZeroOrNull(y);
```
Result:
``` text
┌─x─┐
│ 1 │
│ 2 │
└───┘
```
## coalesce
Returns the leftmost non-`NULL` argument.

View File

@ -204,7 +204,7 @@ Other possible results:
Query:
```sql
SELECT detectLanguageMixed('Je pense que je ne parviendrai jamais à parler français comme un natif. Where theres a will, theres a way.');
SELECT detectLanguage('Je pense que je ne parviendrai jamais à parler français comme un natif. Where theres a will, theres a way.');
```
Result:

View File

@ -57,3 +57,9 @@ Output of a removed comment:
│ │
└─────────┘
```
**Caveats**
For Replicated tables, the comment can be different on different replicas. Modifying the comment applies to a single replica.
The feature is available since version 23.9. It does not work in previous ClickHouse versions.

View File

@ -391,19 +391,19 @@ DEFLATE_QPL is not available in ClickHouse Cloud.
### Specialized Codecs
These codecs are designed to make compression more effective by using specific features of data. Some of these codecs do not compress data themself. Instead, they prepare the data for a common purpose codec, which compresses it better than without this preparation.
These codecs are designed to make compression more effective by exploiting specific features of the data. Some of these codecs do not compress data themself, they instead preprocess the data such that a second compression stage using a general-purpose codec can achieve a higher data compression rate.
#### Delta
`Delta(delta_bytes)` — Compression approach in which raw values are replaced by the difference of two neighboring values, except for the first value that stays unchanged. Up to `delta_bytes` are used for storing delta values, so `delta_bytes` is the maximum size of raw values. Possible `delta_bytes` values: 1, 2, 4, 8. The default value for `delta_bytes` is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. Delta is a data preparation codec, i.e. cannot be used stand-alone.
`Delta(delta_bytes)` — Compression approach in which raw values are replaced by the difference of two neighboring values, except for the first value that stays unchanged. Up to `delta_bytes` are used for storing delta values, so `delta_bytes` is the maximum size of raw values. Possible `delta_bytes` values: 1, 2, 4, 8. The default value for `delta_bytes` is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. Delta is a data preparation codec, i.e. it cannot be used stand-alone.
#### DoubleDelta
`DoubleDelta(bytes_size)` — Calculates delta of deltas and writes it in compact binary form. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-bit deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf). DoubleDelta is a data preparation codec, i.e. cannot be used stand-alone.
`DoubleDelta(bytes_size)` — Calculates delta of deltas and writes it in compact binary form. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-bit deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf). DoubleDelta is a data preparation codec, i.e. it cannot be used stand-alone.
#### GCD
`GCD()` - - Calculates the greatest common denominator (GCD) of the values in the column, then divides each value by the GCD. Can be used with integer, decimal and date/time columns. A viable use case are timestamps or monetary values with high precision. GCD is a data preparation codec, i.e. cannot be used stand-alone.
`GCD()` - - Calculates the greatest common denominator (GCD) of the values in the column, then divides each value by the GCD. Can be used with integer, decimal and date/time columns. The codec is well suited for columns with values that change (increase or decrease) in multiples of the GCD, e.g. 24, 28, 16, 24, 8, 24 (GCD = 4). GCD is a data preparation codec, i.e. it cannot be used stand-alone.
#### Gorilla

View File

@ -128,7 +128,7 @@ $ clickhouse-client --param_tbl="numbers" --param_db="system" --param_col="numbe
- `--port` — порт для подключения, по умолчанию — 9000. Обратите внимание: для HTTP-интерфейса и нативного интерфейса используются разные порты.
- `--user, -u` — имя пользователя, по умолчанию — default.
- `--password` — пароль, по умолчанию — пустая строка.
- `--query, -q` — запрос для выполнения, при использовании в неинтерактивном режиме.
- `--query, -q` — запрос для выполнения, при использовании в неинтерактивном режиме. Допускается указание `--query` несколько раз (`--query "SELECT 1;" --query "SELECT 2;"...`).
- `--queries-file` - путь к файлу с запросами для выполнения. Необходимо указать только одну из опций: `query` или `queries-file`.
- `--database, -d` — выбрать текущую БД. Без указания значение берется из настроек сервера (по умолчанию — БД default).
- `--multiline, -m` — если указано — разрешить многострочные запросы, не отправлять запрос по нажатию Enter.

View File

@ -116,7 +116,7 @@ $ clickhouse-client --param_tuple_in_tuple="(10, ('dt', 10))" -q "SELECT * FROM
- `--port` 连接的端口默认值9000。注意HTTP接口以及TCP原生接口使用的是不同端口。
- `--user, -u` 用户名。 默认值:`default`。
- `--password` 密码。 默认值:空字符串。
- `--query, -q` 使用非交互模式查询。
- `--query, -q` 使用非交互模式查询。 允许多次指定 `--query``--query "SELECT 1;" --query "SELECT 2;"...`)。
- `--database, -d` 默认当前操作的数据库. 默认值:服务端默认的配置(默认是`default`)。
- `--multiline, -m` 如果指定允许多行语句查询Enter仅代表换行不代表查询语句完结
- `--multiquery, -n` 如果指定, 允许处理用`;`号分隔的多个查询,只在非交互模式下生效。

View File

@ -1189,7 +1189,7 @@ void Client::processOptions(const OptionsDescription & options_description,
void Client::processConfig()
{
if (config().has("query") && config().has("queries-file"))
if (!queries.empty() && config().has("queries-file"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Options '--query' and '--queries-file' cannot be specified at the same time");
/// Batch mode is enabled if one of the following is true:
@ -1200,9 +1200,9 @@ void Client::processConfig()
/// - --queries-file command line option is present.
/// The value of the option is used as file with query (or of multiple queries) to execute.
delayed_interactive = config().has("interactive") && (config().has("query") || config().has("queries-file"));
delayed_interactive = config().has("interactive") && (!queries.empty() || config().has("queries-file"));
if (stdin_is_a_tty
&& (delayed_interactive || (!config().has("query") && queries_files.empty())))
&& (delayed_interactive || (queries.empty() && queries_files.empty())))
{
is_interactive = true;
}

View File

@ -555,11 +555,13 @@ catch (...)
void Keeper::logRevision() const
{
Poco::Logger::root().information("Starting ClickHouse Keeper " + std::string{VERSION_STRING}
+ "(revision : " + std::to_string(ClickHouseRevision::getVersionRevision())
+ ", git hash: " + (git_hash.empty() ? "<unknown>" : git_hash)
+ ", build id: " + (build_id.empty() ? "<unknown>" : build_id) + ")"
+ ", PID " + std::to_string(getpid()));
LOG_INFO(&Poco::Logger::get("Application"),
"Starting ClickHouse Keeper {} (revision: {}, git hash: {}, build id: {}), PID {}",
VERSION_STRING,
ClickHouseRevision::getVersionRevision(),
git_hash.empty() ? "<unknown>" : git_hash,
build_id.empty() ? "<unknown>" : build_id,
getpid());
}

View File

@ -319,7 +319,7 @@ static bool checkIfStdinIsRegularFile()
std::string LocalServer::getInitialCreateTableQuery()
{
if (!config().has("table-structure") && !config().has("table-file") && !config().has("table-data-format") && (!checkIfStdinIsRegularFile() || !config().has("query")))
if (!config().has("table-structure") && !config().has("table-file") && !config().has("table-data-format") && (!checkIfStdinIsRegularFile() || queries.empty()))
return {};
auto table_name = backQuoteIfNeed(config().getString("table-name", "table"));
@ -461,7 +461,7 @@ try
if (first_time)
{
if (queries_files.empty() && !config().has("query"))
if (queries_files.empty() && queries.empty())
{
std::cerr << "\033[31m" << "ClickHouse compiled in fuzzing mode." << "\033[0m" << std::endl;
std::cerr << "\033[31m" << "You have to provide a query with --query or --queries-file option." << "\033[0m" << std::endl;
@ -473,7 +473,7 @@ try
#else
is_interactive = stdin_is_a_tty
&& (config().hasOption("interactive")
|| (!config().has("query") && !config().has("table-structure") && queries_files.empty() && !config().has("table-file")));
|| (queries.empty() && !config().has("table-structure") && queries_files.empty() && !config().has("table-file")));
#endif
if (!is_interactive)
{
@ -569,10 +569,10 @@ void LocalServer::updateLoggerLevel(const String & logs_level)
void LocalServer::processConfig()
{
if (config().has("query") && config().has("queries-file"))
if (!queries.empty() && config().has("queries-file"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Options '--query' and '--queries-file' cannot be specified at the same time");
delayed_interactive = config().has("interactive") && (config().has("query") || config().has("queries-file"));
delayed_interactive = config().has("interactive") && (!queries.empty() || config().has("queries-file"));
if (is_interactive && !delayed_interactive)
{
if (config().has("multiquery"))

View File

@ -459,6 +459,10 @@ struct AnalysisOfVarianceMoments
void add(T value, size_t group)
{
if (group == std::numeric_limits<size_t>::max())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Too many groups for analysis of variance (should be no more than {}, got {})",
MAX_GROUPS_NUMBER, group);
resizeIfNeeded(group + 1);
xs1[group] += value;
xs2[group] += value * value;

View File

@ -441,7 +441,20 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
if (!block)
return;
processed_rows += block.rows();
if (block.rows() == 0 && partial_result_mode == PartialResultMode::Active)
{
partial_result_mode = PartialResultMode::Inactive;
if (is_interactive)
{
progress_indication.clearProgressOutput(*tty_buf);
std::cout << "Full result:" << std::endl;
progress_indication.writeProgress(*tty_buf);
}
}
if (partial_result_mode == PartialResultMode::Inactive)
processed_rows += block.rows();
/// Even if all blocks are empty, we still need to initialize the output stream to write empty resultset.
initOutputFormat(block, parsed_query);
@ -451,13 +464,20 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100))
return;
if (!is_interactive && partial_result_mode == PartialResultMode::Active)
return;
/// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker.
if (need_render_progress && tty_buf && (!select_into_file || select_into_file_and_stdout))
progress_indication.clearProgressOutput(*tty_buf);
try
{
output_format->write(materializeBlock(block));
if (partial_result_mode == PartialResultMode::Active)
output_format->writePartialResult(materializeBlock(block));
else
output_format->write(materializeBlock(block));
written_first_block = true;
}
catch (const Exception &)
@ -521,6 +541,9 @@ void ClientBase::onProfileInfo(const ProfileInfo & profile_info)
void ClientBase::initOutputFormat(const Block & block, ASTPtr parsed_query)
try
{
if (partial_result_mode == PartialResultMode::NotInit)
partial_result_mode = PartialResultMode::Active;
if (!output_format)
{
/// Ignore all results when fuzzing as they can be huge.
@ -931,6 +954,14 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
const auto & settings = global_context->getSettingsRef();
const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1;
bool has_partial_result_setting = settings.partial_result_update_duration_ms.totalMilliseconds() > 0;
if (has_partial_result_setting)
{
partial_result_mode = PartialResultMode::NotInit;
if (is_interactive)
std::cout << "Partial result:" << std::endl;
}
int retries_left = 10;
while (retries_left)
@ -1736,6 +1767,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
}
processed_rows = 0;
partial_result_mode = PartialResultMode::Inactive;
written_first_block = false;
progress_indication.resetProgress();
profile_events.watch.restart();
@ -2495,23 +2527,34 @@ void ClientBase::runNonInteractive()
return;
}
String text;
if (config().has("query"))
if (!queries.empty())
{
text += config().getRawString("query"); /// Poco configuration should not process substitutions in form of ${...} inside query.
for (const auto & query : queries)
{
if (query_fuzzer_runs)
{
if (!processWithFuzzing(query))
return;
}
else
{
if (!processQueryText(query))
return;
}
}
}
else
{
/// If 'query' parameter is not set, read a query from stdin.
/// The query is read entirely into memory (streaming is disabled).
ReadBufferFromFileDescriptor in(STDIN_FILENO);
String text;
readStringUntilEOF(text, in);
if (query_fuzzer_runs)
processWithFuzzing(text);
else
processQueryText(text);
}
if (query_fuzzer_runs)
processWithFuzzing(text);
else
processQueryText(text);
}
@ -2680,8 +2723,8 @@ void ClientBase::init(int argc, char ** argv)
stderr_is_a_tty = isatty(STDERR_FILENO);
terminal_width = getTerminalWidth();
Arguments common_arguments{""}; /// 0th argument is ignored.
std::vector<Arguments> external_tables_arguments;
Arguments common_arguments = {""}; /// 0th argument is ignored.
std::vector<Arguments> hosts_and_ports_arguments;
readArguments(argc, argv, common_arguments, external_tables_arguments, hosts_and_ports_arguments);
@ -2699,7 +2742,6 @@ void ClientBase::init(int argc, char ** argv)
}
po::variables_map options;
OptionsDescription options_description;
options_description.main_description.emplace(createOptionsDescription("Main options", terminal_width));
@ -2711,9 +2753,8 @@ void ClientBase::init(int argc, char ** argv)
("config-file,C", po::value<std::string>(), "config-file path")
("query,q", po::value<std::string>(), "query")
("queries-file", po::value<std::vector<std::string>>()->multitoken(),
"file path with queries to execute; multiple files can be specified (--queries-file file1 file2...)")
("query,q", po::value<std::vector<std::string>>()->multitoken(), R"(query; can be specified multiple times (--query "SELECT 1" --query "SELECT 2"...))")
("queries-file", po::value<std::vector<std::string>>()->multitoken(), "file path with queries to execute; multiple files can be specified (--queries-file file1 file2...)")
("multiquery,n", "If specified, multiple queries separated by semicolons can be listed after --query. For convenience, it is also possible to omit --query and pass the queries directly after --multiquery.")
("multiline,m", "If specified, allow multiline queries (do not send the query on Enter)")
("database,d", po::value<std::string>(), "database")
@ -2734,8 +2775,7 @@ void ClientBase::init(int argc, char ** argv)
("log-level", po::value<std::string>(), "log level")
("server_logs_file", po::value<std::string>(), "put server logs into specified file")
("suggestion_limit", po::value<int>()->default_value(10000),
"Suggestion limit for how many databases, tables and columns to fetch.")
("suggestion_limit", po::value<int>()->default_value(10000), "Suggestion limit for how many databases, tables and columns to fetch.")
("format,f", po::value<std::string>(), "default output format")
("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command")
@ -2773,6 +2813,7 @@ void ClientBase::init(int argc, char ** argv)
std::transform(external_options.begin(), external_options.end(), std::back_inserter(cmd_options), getter);
}
po::variables_map options;
parseAndCheckOptions(options_description, options, common_arguments);
po::notify(options);
@ -2800,7 +2841,7 @@ void ClientBase::init(int argc, char ** argv)
if (options.count("time"))
print_time_to_stderr = true;
if (options.count("query"))
config().setString("query", options["query"].as<std::string>());
queries = options["query"].as<std::vector<std::string>>();
if (options.count("query_id"))
config().setString("query_id", options["query_id"].as<std::string>());
if (options.count("database"))

View File

@ -202,6 +202,7 @@ protected:
std::optional<Suggest> suggest;
bool load_suggestions = false;
std::vector<String> queries; /// Queries passed via '--query'
std::vector<String> queries_files; /// If not empty, queries will be read from these files
std::vector<String> interleave_queries_files; /// If not empty, run queries from these files before processing every file from 'queries_files'.
std::vector<String> cmd_options;
@ -271,6 +272,21 @@ protected:
size_t processed_rows = 0; /// How many rows have been read or written.
bool print_num_processed_rows = false; /// Whether to print the number of processed rows at
enum class PartialResultMode: UInt8
{
/// Query doesn't show partial result before the first block with 0 rows.
/// The first block with 0 rows initializes the output table format using its header.
NotInit,
/// Query shows partial result after the first and before the second block with 0 rows.
/// The second block with 0 rows indicates that that receiving blocks with partial result has been completed and next blocks will be with the full result.
Active,
/// Query doesn't show partial result at all.
Inactive,
};
PartialResultMode partial_result_mode = PartialResultMode::Inactive;
bool print_stack_trace = false;
/// The last exception that was received from the server. Is used for the
/// return code in batch mode.

View File

@ -134,6 +134,8 @@ void Connection::disconnect()
if (!is_initialized)
return;
// If driver->free_me, then mysql_close will deallocate memory by calling 'free' function.
assert(driver && !driver->free_me);
mysql_close(driver.get());
memset(driver.get(), 0, sizeof(*driver));

View File

@ -5,7 +5,6 @@
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <IO/WriteHelpers.h>
namespace DB

View File

@ -3,14 +3,8 @@
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <IO/WriteHelpers.h>
#include "Common/Exception.h"
#include "DataTypes/IDataType.h"
#include "base/Decimal_fwd.h"
#include "base/types.h"
#include "config.h"
#include <boost/integer/common_factor.hpp>
#include <libdivide-config.h>
@ -84,7 +78,7 @@ void compressDataForType(const char * source, UInt32 source_size, char * dest)
const char * const source_end = source + source_size;
T gcd_divider{};
T gcd_divider = 0;
const auto * cur_source = source;
while (gcd_divider != T(1) && cur_source < source_end)
{
@ -100,7 +94,7 @@ void compressDataForType(const char * source, UInt32 source_size, char * dest)
if constexpr (sizeof(T) <= 8)
{
/// libdivide support only UInt32 and UInt64.
/// libdivide supports only UInt32 and UInt64.
using LibdivideT = std::conditional_t<sizeof(T) <= 4, UInt32, UInt64>;
libdivide::divider<LibdivideT> divider(static_cast<LibdivideT>(gcd_divider));
cur_source = source;
@ -126,8 +120,6 @@ void compressDataForType(const char * source, UInt32 source_size, char * dest)
template <typename T>
void decompressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const char * const output_end = dest + output_size;
if (source_size % sizeof(T) != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot GCD decompress, data size {} is not aligned to {}", source_size, sizeof(T));
@ -135,11 +127,14 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest,
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot GCD decompress, data size {} is less than {}", source_size, sizeof(T));
const char * const source_end = source + source_size;
const char * const dest_end = dest + output_size;
const T gcd_multiplier = unalignedLoad<T>(source);
source += sizeof(T);
while (source < source_end)
{
if (dest + sizeof(T) > output_end) [[unlikely]]
if (dest + sizeof(T) > dest_end) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress the data");
unalignedStore<T>(dest, unalignedLoad<T>(source) * gcd_multiplier);

View File

@ -1048,18 +1048,26 @@ void Changelog::writeThread()
LOG_WARNING(log, "Changelog is shut down");
};
/// NuRaft writes a batch of request by first calling multiple store requests, i.e. AppendLog
/// finished by a flush request
/// We assume that after some number of appends, we always get flush request
while (true)
{
if (try_batch_flush)
{
try_batch_flush = false;
/// we have Flush request stored in write operation
/// but we try to get new append operations
/// if there are none, we apply the currently set Flush
chassert(std::holds_alternative<Flush>(write_operation));
if (!write_operations.tryPop(write_operation))
{
chassert(batch_append_ok);
const auto & flush = std::get<Flush>(write_operation);
flush_logs(flush);
notify_append_completion();
continue;
if (!write_operations.pop(write_operation))
break;
}
}
else if (!write_operations.pop(write_operation))
@ -1092,10 +1100,12 @@ void Changelog::writeThread()
try_batch_flush = true;
continue;
}
/// we need to flush because we have maximum allowed pending records
flush_logs(flush);
}
else
{
std::lock_guard lock{durable_idx_mutex};
*flush.failed = true;
}
notify_append_completion();

View File

@ -22,19 +22,16 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
for (const String & key : config_keys)
try
{
try
{
for (const String & key : config_keys)
set(key, config.getString(config_elem + "." + key));
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
LOG_WARNING(&Poco::Logger::get("CoordinationSettings"), "Found unknown coordination setting in config: '{}'", key);
else
throw;
}
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("in Coordination settings config");
throw;
}
}

View File

@ -149,7 +149,8 @@ void KeeperDispatcher::requestThread()
};
/// Waiting until previous append will be successful, or batch is big enough
while (!shutdown_called && !has_reconfig_request && !prev_result_done() && current_batch.size() <= max_batch_size
while (!shutdown_called && !has_reconfig_request &&
!prev_result_done() && current_batch.size() <= max_batch_size
&& current_batch_bytes_size < max_batch_bytes_size)
{
try_get_request();
@ -190,18 +191,24 @@ void KeeperDispatcher::requestThread()
if (prev_result)
result_buf = forceWaitAndProcessResult(prev_result, current_batch);
/// In case of older version or disabled async replication, result buf will be set to value of `commit` function
/// which always returns nullptr
/// in that case we don't have to do manual wait because are already sure that the batch was committed when we get
/// the result back
/// otherwise, we need to manually wait until the batch is committed
if (result_buf)
{
nuraft::buffer_serializer bs(result_buf);
auto log_idx = bs.get_u64();
/// we will wake up this thread on each commit so we need to run it in loop until the last request of batch is committed
while (true)
{
auto current_last_committed_idx = last_committed_log_idx.load(std::memory_order_relaxed);
auto current_last_committed_idx = our_last_committed_log_idx.load(std::memory_order_relaxed);
if (current_last_committed_idx >= log_idx)
break;
last_committed_log_idx.wait(current_last_committed_idx);
our_last_committed_log_idx.wait(current_last_committed_idx);
}
}
}
@ -397,8 +404,8 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
}
}
last_committed_log_idx.store(log_idx, std::memory_order_relaxed);
last_committed_log_idx.notify_all();
our_last_committed_log_idx.store(log_idx, std::memory_order_relaxed);
our_last_committed_log_idx.notify_all();
});
try

View File

@ -107,7 +107,7 @@ private:
public:
std::mutex read_request_queue_mutex;
std::atomic<uint64_t> last_committed_log_idx = 0;
std::atomic<uint64_t> our_last_committed_log_idx = 0;
/// queue of read requests that can be processed after a request with specific session ID and XID is committed
std::unordered_map<int64_t, std::unordered_map<Coordination::XID, KeeperStorage::RequestsForSessions>> read_request_queue;

View File

@ -167,7 +167,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nur
request_for_session->zxid = log_idx;
preprocess(*request_for_session);
auto result = nuraft::buffer::alloc(8);
auto result = nuraft::buffer::alloc(sizeof(log_idx));
nuraft::buffer_serializer ss(result);
ss.put_u64(log_idx);
return result;

View File

@ -250,15 +250,14 @@ KeeperStateManager::KeeperStateManager(
, log_store(nuraft::cs_new<KeeperLogStore>(
LogFileSettings
{
.force_sync = coordination_settings->force_sync,
.compress_logs = coordination_settings->compress_logs,
.rotate_interval = coordination_settings->rotate_log_storage_interval,
.max_size = coordination_settings->max_log_file_size,
.overallocate_size = coordination_settings->log_file_overallocate_size
},
.force_sync = coordination_settings->force_sync,
.compress_logs = coordination_settings->compress_logs,
.rotate_interval = coordination_settings->rotate_log_storage_interval,
.max_size = coordination_settings->max_log_file_size,
.overallocate_size = coordination_settings->log_file_overallocate_size},
FlushSettings
{
.max_flush_batch_size = coordination_settings->max_flush_batch_size,
.max_flush_batch_size = coordination_settings->max_flush_batch_size,
},
keeper_context_))
, server_state_file_name(server_state_file_name_)

View File

@ -309,6 +309,9 @@ class IColumn;
\
M(Bool, partial_result_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \
\
M(Milliseconds, partial_result_update_duration_ms, 0, "Interval (in milliseconds) for sending updates with partial data about the result table to the client (in interactive mode) during query execution. Setting to 0 disables partial results. Only supported for single-threaded GROUP BY without key, ORDER BY, LIMIT and OFFSET.", 0) \
M(UInt64, max_rows_in_partial_result, 10, "Maximum rows to show in the partial result after every real-time update while the query runs (use partial result limit + OFFSET as a value in case of OFFSET in the query).", 0) \
\
M(Bool, ignore_on_cluster_for_replicated_udf_queries, false, "Ignore ON CLUSTER clause for replicated UDF management queries.", 0) \
M(Bool, ignore_on_cluster_for_replicated_access_entities_queries, false, "Ignore ON CLUSTER clause for replicated access entities management queries.", 0) \
/** Settings for testing hedged requests */ \

View File

@ -124,11 +124,7 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
break;
}
catch (const Azure::Core::Http::TransportException & e)
{
handle_exception(e, i);
}
catch (const Azure::Storage::StorageException & e)
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e, i);
}
@ -240,10 +236,6 @@ void ReadBufferFromAzureBlobStorage::initialize()
data_stream = std::move(download_response.Value.BodyStream);
break;
}
catch (const Azure::Core::Http::TransportException & e)
{
handle_exception(e, i);
}
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e,i);

View File

@ -62,10 +62,6 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
func();
break;
}
catch (const Azure::Core::Http::TransportException & e)
{
handle_exception(e, i);
}
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e, i);

View File

@ -35,7 +35,7 @@ TEST(AzureBlobContainerClient, CurlMemoryLeak)
options.Retry.MaxRetries = 0;
auto client = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(unavailable_url, container, options));
EXPECT_THROW({ client->ListBlobs(); }, Azure::Core::Http::TransportException);
EXPECT_THROW({ client->ListBlobs(); }, Azure::Core::RequestFailedException);
}
#endif

View File

@ -58,6 +58,7 @@
#include <Common/HashTable/HashMap.h>
#include <DataTypes/DataTypeIPv4andIPv6.h>
#include <Common/IPv6ToBinary.h>
#include "DataTypes/IDataType.h"
#include <Core/Types.h>
@ -87,6 +88,7 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
extern const int CANNOT_PARSE_BOOL;
}
@ -883,75 +885,179 @@ struct ConvertImpl<FromDataType, DataTypeString, Name, ConvertDefaultBehaviorTag
static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/)
{
ColumnUInt8::MutablePtr null_map = copyNullMap(arguments[0].column);
const auto & col_with_type_and_name = columnGetNested(arguments[0]);
const auto & type = static_cast<const FromDataType &>(*col_with_type_and_name.type);
const DateLUTImpl * time_zone = nullptr;
if constexpr (std::is_same_v<FromDataType, DataTypeDate> || std::is_same_v<FromDataType, DataTypeDate32>)
time_zone = &DateLUT::instance();
/// For argument of Date or DateTime type, second argument with time zone could be specified.
if constexpr (std::is_same_v<FromDataType, DataTypeDateTime> || std::is_same_v<FromDataType, DataTypeDateTime64>)
if constexpr (IsDataTypeDateOrDateTime<FromDataType>)
{
auto non_null_args = createBlockWithNestedColumns(arguments);
time_zone = &extractTimeZoneFromFunctionArguments(non_null_args, 1, 0);
}
auto datetime_arg = arguments[0];
if (const auto col_from = checkAndGetColumn<ColVecType>(col_with_type_and_name.column.get()))
{
auto col_to = ColumnString::create();
const DateLUTImpl * time_zone = nullptr;
const ColumnConst * time_zone_column = nullptr;
const typename ColVecType::Container & vec_from = col_from->getData();
ColumnString::Chars & data_to = col_to->getChars();
ColumnString::Offsets & offsets_to = col_to->getOffsets();
size_t size = vec_from.size();
if constexpr (std::is_same_v<FromDataType, DataTypeDate>)
data_to.resize(size * (strlen("YYYY-MM-DD") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDate32>)
data_to.resize(size * (strlen("YYYY-MM-DD") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDateTime>)
data_to.resize(size * (strlen("YYYY-MM-DD hh:mm:ss") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDateTime64>)
data_to.resize(size * (strlen("YYYY-MM-DD hh:mm:ss.") + col_from->getScale() + 1));
else
data_to.resize(size * 3); /// Arbitrary
offsets_to.resize(size);
WriteBufferFromVector<ColumnString::Chars> write_buffer(data_to);
if (null_map)
if (arguments.size() == 1)
{
for (size_t i = 0; i < size; ++i)
auto non_null_args = createBlockWithNestedColumns(arguments);
time_zone = &extractTimeZoneFromFunctionArguments(non_null_args, 1, 0);
}
else /// When we have a column for timezone
{
datetime_arg.column = datetime_arg.column->convertToFullColumnIfConst();
if constexpr (std::is_same_v<FromDataType, DataTypeDate> || std::is_same_v<FromDataType, DataTypeDate32>)
time_zone = &DateLUT::instance();
/// For argument of Date or DateTime type, second argument with time zone could be specified.
if constexpr (std::is_same_v<FromDataType, DataTypeDateTime> || std::is_same_v<FromDataType, DataTypeDateTime64>)
{
bool is_ok = FormatImpl<FromDataType>::template execute<bool>(vec_from[i], write_buffer, &type, time_zone);
null_map->getData()[i] |= !is_ok;
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
if ((time_zone_column = checkAndGetColumnConst<ColumnString>(arguments[1].column.get())))
{
auto non_null_args = createBlockWithNestedColumns(arguments);
time_zone = &extractTimeZoneFromFunctionArguments(non_null_args, 1, 0);
}
}
}
else
const auto & col_with_type_and_name = columnGetNested(datetime_arg);
if (const auto col_from = checkAndGetColumn<ColVecType>(col_with_type_and_name.column.get()))
{
for (size_t i = 0; i < size; ++i)
auto col_to = ColumnString::create();
const typename ColVecType::Container & vec_from = col_from->getData();
ColumnString::Chars & data_to = col_to->getChars();
ColumnString::Offsets & offsets_to = col_to->getOffsets();
size_t size = vec_from.size();
if constexpr (std::is_same_v<FromDataType, DataTypeDate>)
data_to.resize(size * (strlen("YYYY-MM-DD") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDate32>)
data_to.resize(size * (strlen("YYYY-MM-DD") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDateTime>)
data_to.resize(size * (strlen("YYYY-MM-DD hh:mm:ss") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDateTime64>)
data_to.resize(size * (strlen("YYYY-MM-DD hh:mm:ss.") + col_from->getScale() + 1));
else
data_to.resize(size * 3); /// Arbitrary
offsets_to.resize(size);
WriteBufferFromVector<ColumnString::Chars> write_buffer(data_to);
const auto & type = static_cast<const FromDataType &>(*col_with_type_and_name.type);
ColumnUInt8::MutablePtr null_map = copyNullMap(datetime_arg.column);
if (null_map)
{
FormatImpl<FromDataType>::template execute<void>(vec_from[i], write_buffer, &type, time_zone);
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
for (size_t i = 0; i < size; ++i)
{
if (!time_zone_column && arguments.size() > 1)
{
if (!arguments[1].column.get()->getDataAt(i).toString().empty())
time_zone = &DateLUT::instance(arguments[1].column.get()->getDataAt(i).toString());
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Provided time zone must be non-empty");
}
bool is_ok = FormatImpl<FromDataType>::template execute<bool>(vec_from[i], write_buffer, &type, time_zone);
null_map->getData()[i] |= !is_ok;
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
}
}
else
{
for (size_t i = 0; i < size; ++i)
{
if (!time_zone_column && arguments.size() > 1)
{
if (!arguments[1].column.get()->getDataAt(i).toString().empty())
time_zone = &DateLUT::instance(arguments[1].column.get()->getDataAt(i).toString());
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Provided time zone must be non-empty");
}
FormatImpl<FromDataType>::template execute<void>(vec_from[i], write_buffer, &type, time_zone);
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
}
}
write_buffer.finalize();
if (null_map)
return ColumnNullable::create(std::move(col_to), std::move(null_map));
return col_to;
}
write_buffer.finalize();
if (null_map)
return ColumnNullable::create(std::move(col_to), std::move(null_map));
return col_to;
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}",
arguments[0].column->getName(), Name::name);
}
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}",
arguments[0].column->getName(), Name::name);
{
ColumnUInt8::MutablePtr null_map = copyNullMap(arguments[0].column);
const auto & col_with_type_and_name = columnGetNested(arguments[0]);
const auto & type = static_cast<const FromDataType &>(*col_with_type_and_name.type);
const DateLUTImpl * time_zone = nullptr;
if constexpr (std::is_same_v<FromDataType, DataTypeDate> || std::is_same_v<FromDataType, DataTypeDate32>)
time_zone = &DateLUT::instance();
/// For argument of Date or DateTime type, second argument with time zone could be specified.
if constexpr (std::is_same_v<FromDataType, DataTypeDateTime> || std::is_same_v<FromDataType, DataTypeDateTime64>)
{
auto non_null_args = createBlockWithNestedColumns(arguments);
time_zone = &extractTimeZoneFromFunctionArguments(non_null_args, 1, 0);
}
if (const auto col_from = checkAndGetColumn<ColVecType>(col_with_type_and_name.column.get()))
{
auto col_to = ColumnString::create();
const typename ColVecType::Container & vec_from = col_from->getData();
ColumnString::Chars & data_to = col_to->getChars();
ColumnString::Offsets & offsets_to = col_to->getOffsets();
size_t size = vec_from.size();
if constexpr (std::is_same_v<FromDataType, DataTypeDate>)
data_to.resize(size * (strlen("YYYY-MM-DD") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDate32>)
data_to.resize(size * (strlen("YYYY-MM-DD") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDateTime>)
data_to.resize(size * (strlen("YYYY-MM-DD hh:mm:ss") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDateTime64>)
data_to.resize(size * (strlen("YYYY-MM-DD hh:mm:ss.") + col_from->getScale() + 1));
else
data_to.resize(size * 3); /// Arbitrary
offsets_to.resize(size);
WriteBufferFromVector<ColumnString::Chars> write_buffer(data_to);
if (null_map)
{
for (size_t i = 0; i < size; ++i)
{
bool is_ok = FormatImpl<FromDataType>::template execute<bool>(vec_from[i], write_buffer, &type, time_zone);
null_map->getData()[i] |= !is_ok;
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
}
}
else
{
for (size_t i = 0; i < size; ++i)
{
FormatImpl<FromDataType>::template execute<void>(vec_from[i], write_buffer, &type, time_zone);
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
}
}
write_buffer.finalize();
if (null_map)
return ColumnNullable::create(std::move(col_to), std::move(null_map));
return col_to;
}
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}",
arguments[0].column->getName(), Name::name);
}
}
};
@ -1597,7 +1703,19 @@ struct ConvertImplGenericFromString
const auto & val = col_from_string->getDataAt(i);
ReadBufferFromMemory read_buffer(val.data, val.size);
serialization_from.deserializeWholeText(column_to, read_buffer, format_settings);
try
{
serialization_from.deserializeWholeText(column_to, read_buffer, format_settings);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::CANNOT_PARSE_BOOL && typeid_cast<ColumnNullable *>(&column_to))
{
column_to.insertDefault();
continue;
}
throw;
}
if (!read_buffer.eof())
{
@ -1854,7 +1972,7 @@ public:
// toDateTime64(value, scale : Integer[, timezone: String])
|| std::is_same_v<ToDataType, DataTypeDateTime64>)
{
optional_args.push_back({"timezone", &isString<IDataType>, &isColumnConst, "const String"});
optional_args.push_back({"timezone", &isString<IDataType>, nullptr, "String"});
}
validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
@ -1918,7 +2036,9 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override
{
if constexpr (std::is_same_v<ToDataType, DataTypeDateTime64>)
if constexpr (std::is_same_v<ToDataType, DataTypeString>)
return {};
else if constexpr (std::is_same_v<ToDataType, DataTypeDateTime64>)
return {2};
return {1};
}
@ -4054,15 +4174,21 @@ private:
{
if constexpr (std::is_same_v<ToDataType, DataTypeIPv4>)
{
ret = [cast_ipv4_ipv6_default_on_conversion_error_value, input_format_ipv4_default_on_conversion_error_value, requested_result_is_nullable](
ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable * column_nullable, size_t)
-> ColumnPtr
ret = [cast_ipv4_ipv6_default_on_conversion_error_value,
input_format_ipv4_default_on_conversion_error_value,
requested_result_is_nullable](
ColumnsWithTypeAndName & arguments,
const DataTypePtr & result_type,
const ColumnNullable * column_nullable,
size_t) -> ColumnPtr
{
if (!WhichDataType(result_type).isIPv4())
throw Exception(ErrorCodes::TYPE_MISMATCH, "Wrong result type {}. Expected IPv4", result_type->getName());
const auto * null_map = column_nullable ? &column_nullable->getNullMapData() : nullptr;
if (cast_ipv4_ipv6_default_on_conversion_error_value || input_format_ipv4_default_on_conversion_error_value || requested_result_is_nullable)
if (requested_result_is_nullable)
return convertToIPv4<IPStringToNumExceptionMode::Null>(arguments[0].column, null_map);
else if (cast_ipv4_ipv6_default_on_conversion_error_value || input_format_ipv4_default_on_conversion_error_value)
return convertToIPv4<IPStringToNumExceptionMode::Default>(arguments[0].column, null_map);
else
return convertToIPv4<IPStringToNumExceptionMode::Throw>(arguments[0].column, null_map);
@ -4073,16 +4199,22 @@ private:
if constexpr (std::is_same_v<ToDataType, DataTypeIPv6>)
{
ret = [cast_ipv4_ipv6_default_on_conversion_error_value, input_format_ipv6_default_on_conversion_error_value, requested_result_is_nullable](
ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable * column_nullable, size_t)
-> ColumnPtr
ret = [cast_ipv4_ipv6_default_on_conversion_error_value,
input_format_ipv6_default_on_conversion_error_value,
requested_result_is_nullable](
ColumnsWithTypeAndName & arguments,
const DataTypePtr & result_type,
const ColumnNullable * column_nullable,
size_t) -> ColumnPtr
{
if (!WhichDataType(result_type).isIPv6())
throw Exception(
ErrorCodes::TYPE_MISMATCH, "Wrong result type {}. Expected IPv6", result_type->getName());
const auto * null_map = column_nullable ? &column_nullable->getNullMapData() : nullptr;
if (cast_ipv4_ipv6_default_on_conversion_error_value || input_format_ipv6_default_on_conversion_error_value || requested_result_is_nullable)
if (requested_result_is_nullable)
return convertToIPv6<IPStringToNumExceptionMode::Null>(arguments[0].column, null_map);
else if (cast_ipv4_ipv6_default_on_conversion_error_value || input_format_ipv6_default_on_conversion_error_value)
return convertToIPv6<IPStringToNumExceptionMode::Default>(arguments[0].column, null_map);
else
return convertToIPv6<IPStringToNumExceptionMode::Throw>(arguments[0].column, null_map);
@ -4093,7 +4225,18 @@ private:
if (to_type->getCustomSerialization() && to_type->getCustomName())
{
ret = &ConvertImplGenericFromString<typename FromDataType::ColumnType>::execute;
ret = [requested_result_is_nullable](
ColumnsWithTypeAndName & arguments,
const DataTypePtr & result_type,
const ColumnNullable * column_nullable,
size_t input_rows_count) -> ColumnPtr
{
auto wrapped_result_type = result_type;
if (requested_result_is_nullable)
wrapped_result_type = makeNullable(result_type);
return ConvertImplGenericFromString<typename FromDataType::ColumnType>::execute(
arguments, wrapped_result_type, column_nullable, input_rows_count);
};
return true;
}
}
@ -4108,7 +4251,9 @@ private:
ErrorCodes::TYPE_MISMATCH, "Wrong result type {}. Expected IPv4", result_type->getName());
const auto * null_map = column_nullable ? &column_nullable->getNullMapData() : nullptr;
if (cast_ipv4_ipv6_default_on_conversion_error_value || requested_result_is_nullable)
if (requested_result_is_nullable)
return convertIPv6ToIPv4<IPStringToNumExceptionMode::Null>(arguments[0].column, null_map);
else if (cast_ipv4_ipv6_default_on_conversion_error_value)
return convertIPv6ToIPv4<IPStringToNumExceptionMode::Default>(arguments[0].column, null_map);
else
return convertIPv6ToIPv4<IPStringToNumExceptionMode::Throw>(arguments[0].column, null_map);

View File

@ -102,17 +102,13 @@ private:
if (key_argument_data_type.isArray())
{
DataTypePtr value_type;
if (1 < arguments.size())
value_type = arguments[1];
if (arguments.size() < 2 || (value_type && !isArray(value_type)))
if (arguments.size() < 2 || !arguments[1] || !isArray(arguments[1]))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} if array argument is passed as key, additional array argument as value must be passed",
getName());
const auto & key_array_type = assert_cast<const DataTypeArray &>(*arguments[0]);
const auto & value_array_type = assert_cast<const DataTypeArray &>(*value_type);
const auto & value_array_type = assert_cast<const DataTypeArray &>(*arguments[1]);
key_argument_series_type = key_array_type.getNestedType();
value_argument_series_type = value_array_type.getNestedType();

View File

@ -746,7 +746,7 @@ public:
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
@ -855,17 +855,25 @@ public:
template <typename DataType>
ColumnPtr executeType(const ColumnsWithTypeAndName & arguments, const DataTypePtr &) const
{
auto * times = checkAndGetColumn<typename DataType::ColumnType>(arguments[0].column.get());
auto non_const_datetime = arguments[0].column->convertToFullColumnIfConst();
auto * times = checkAndGetColumn<typename DataType::ColumnType>(non_const_datetime.get());
if (!times)
return nullptr;
const ColumnConst * format_column = checkAndGetColumnConst<ColumnString>(arguments[1].column.get());
if (!format_column)
String format;
if (const auto * format_column = checkAndGetColumnConst<ColumnString>(arguments[1].column.get()))
format = format_column->getValue<String>();
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of second ('format') argument of function {}. Must be constant string.",
arguments[1].column->getName(), getName());
String format = format_column->getValue<String>();
const ColumnConst * const_time_zone_column = nullptr;
const DateLUTImpl * time_zone = nullptr;
if (arguments.size() == 2)
time_zone = &extractTimeZoneFromFunctionArguments(arguments, 2, 0);
else if (arguments.size() > 2)
const_time_zone_column = checkAndGetColumnConst<ColumnString>(arguments[2].column.get());
UInt32 scale [[maybe_unused]] = 0;
if constexpr (std::is_same_v<DataType, DataTypeDateTime64>)
@ -893,15 +901,19 @@ public:
String out_template;
size_t out_template_size = parseFormat(format, instructions, scale, mysql_with_only_fixed_length_formatters, out_template);
const DateLUTImpl * time_zone_tmp = nullptr;
if (castType(arguments[0].type.get(), [&]([[maybe_unused]] const auto & type) { return true; }))
time_zone_tmp = &extractTimeZoneFromFunctionArguments(arguments, 2, 0);
{
if (const_time_zone_column)
time_zone = &extractTimeZoneFromFunctionArguments(arguments, 2, 0);
}
else if (std::is_same_v<DataType, DataTypeDateTime64> || std::is_same_v<DataType, DataTypeDateTime>)
time_zone_tmp = &extractTimeZoneFromFunctionArguments(arguments, 2, 0);
{
if (const_time_zone_column)
time_zone = &extractTimeZoneFromFunctionArguments(arguments, 2, 0);
}
else
time_zone_tmp = &DateLUT::instance();
time_zone = &DateLUT::instance();
const DateLUTImpl & time_zone = *time_zone_tmp;
const auto & vec = times->getData();
auto col_res = ColumnString::create();
@ -941,6 +953,13 @@ public:
auto * pos = begin;
for (size_t i = 0; i < vec.size(); ++i)
{
if (!const_time_zone_column && arguments.size() > 2)
{
if (!arguments[2].column.get()->getDataAt(i).toString().empty())
time_zone = &DateLUT::instance(arguments[2].column.get()->getDataAt(i).toString());
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Provided time zone must be non-empty");
}
if constexpr (std::is_same_v<DataType, DataTypeDateTime64>)
{
auto c = DecimalUtils::split(vec[i], scale);
@ -954,12 +973,14 @@ public:
}
for (auto & instruction : instructions)
instruction.perform(pos, static_cast<Int64>(c.whole), c.fractional, scale, time_zone);
{
instruction.perform(pos, static_cast<Int64>(c.whole), c.fractional, scale, *time_zone);
}
}
else
{
for (auto & instruction : instructions)
instruction.perform(pos, static_cast<UInt32>(vec[i]), 0, 0, time_zone);
instruction.perform(pos, static_cast<UInt32>(vec[i]), 0, 0, *time_zone);
}
*pos++ = '\0';

View File

@ -44,14 +44,18 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & types) const override
{
if (!isNumber(removeNullable(types.at(0))))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The argument of function {} must have simple numeric type, possibly Nullable", name);
if (!isNumber(removeNullable(types.at(0))) && !isNothing(removeNullable(types.at(0))))
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "The argument of function {} must have simple numeric type, possibly Nullable or Null", name);
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
if (isNothing(removeNullable(arguments[0].type)))
return DataTypeUInt8{}.createColumnConst(input_rows_count, 1);
const ColumnPtr & input_column = arguments[0].column;
ColumnPtr res;
@ -72,7 +76,10 @@ public:
return true;
}))
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "The argument of function {} must have simple numeric type, possibly Nullable", name);
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The argument of function {} must have simple numeric type, possibly Nullable or Null",
name);
}
}
else
@ -89,7 +96,10 @@ public:
return true;
}))
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "The argument of function {} must have simple numeric type, possibly Nullable", name);
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The argument of function {} must have simple numeric type, possibly Nullable or Null",
name);
}
}

View File

@ -11,6 +11,9 @@
# include <aws/core/utils/UUID.h>
# include <aws/core/http/HttpClientFactory.h>
# include <aws/core/utils/HashingUtils.h>
# include <aws/core/platform/FileSystem.h>
# include <Common/logger_useful.h>
# include <IO/S3/PocoHTTPClient.h>
@ -43,6 +46,8 @@ bool areCredentialsEmptyOrExpired(const Aws::Auth::AWSCredentials & credentials,
return now >= credentials.GetExpiration() - std::chrono::seconds(expiration_window_seconds);
}
const char SSO_CREDENTIALS_PROVIDER_LOG_TAG[] = "SSOCredentialsProvider";
}
AWSEC2MetadataClient::AWSEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration, const char * endpoint_)
@ -449,6 +454,139 @@ void AwsAuthSTSAssumeRoleWebIdentityCredentialsProvider::refreshIfExpired()
Reload();
}
SSOCredentialsProvider::SSOCredentialsProvider(DB::S3::PocoHTTPClientConfiguration aws_client_configuration_, uint64_t expiration_window_seconds_)
: profile_to_use(Aws::Auth::GetConfigProfileName())
, aws_client_configuration(std::move(aws_client_configuration_))
, expiration_window_seconds(expiration_window_seconds_)
, logger(&Poco::Logger::get(SSO_CREDENTIALS_PROVIDER_LOG_TAG))
{
LOG_INFO(logger, "Setting sso credentials provider to read config from {}", profile_to_use);
}
Aws::Auth::AWSCredentials SSOCredentialsProvider::GetAWSCredentials()
{
refreshIfExpired();
Aws::Utils::Threading::ReaderLockGuard guard(m_reloadLock);
return credentials;
}
void SSOCredentialsProvider::Reload()
{
auto profile = Aws::Config::GetCachedConfigProfile(profile_to_use);
const auto access_token = [&]
{
// If we have an SSO Session set, use the refreshed token.
if (profile.IsSsoSessionSet())
{
sso_region = profile.GetSsoSession().GetSsoRegion();
auto token = bearer_token_provider.GetAWSBearerToken();
expires_at = token.GetExpiration();
return token.GetToken();
}
Aws::String hashed_start_url = Aws::Utils::HashingUtils::HexEncode(Aws::Utils::HashingUtils::CalculateSHA1(profile.GetSsoStartUrl()));
auto profile_directory = Aws::Auth::ProfileConfigFileAWSCredentialsProvider::GetProfileDirectory();
Aws::StringStream ss_token;
ss_token << profile_directory;
ss_token << Aws::FileSystem::PATH_DELIM << "sso" << Aws::FileSystem::PATH_DELIM << "cache" << Aws::FileSystem::PATH_DELIM << hashed_start_url << ".json";
auto sso_token_path = ss_token.str();
LOG_INFO(logger, "Loading token from: {}", sso_token_path);
sso_region = profile.GetSsoRegion();
return loadAccessTokenFile(sso_token_path);
}();
if (access_token.empty())
{
LOG_TRACE(logger, "Access token for SSO not available");
return;
}
if (expires_at < Aws::Utils::DateTime::Now())
{
LOG_TRACE(logger, "Cached Token expired at {}", expires_at.ToGmtString(Aws::Utils::DateFormat::ISO_8601));
return;
}
Aws::Internal::SSOCredentialsClient::SSOGetRoleCredentialsRequest request;
request.m_ssoAccountId = profile.GetSsoAccountId();
request.m_ssoRoleName = profile.GetSsoRoleName();
request.m_accessToken = access_token;
aws_client_configuration.scheme = Aws::Http::Scheme::HTTPS;
aws_client_configuration.region = sso_region;
LOG_TRACE(logger, "Passing config to client for region: {}", sso_region);
Aws::Vector<Aws::String> retryable_errors;
retryable_errors.push_back("TooManyRequestsException");
aws_client_configuration.retryStrategy = Aws::MakeShared<Aws::Client::SpecifiedRetryableErrorsRetryStrategy>(SSO_CREDENTIALS_PROVIDER_LOG_TAG, retryable_errors, /*maxRetries=*/3);
client = Aws::MakeUnique<Aws::Internal::SSOCredentialsClient>(SSO_CREDENTIALS_PROVIDER_LOG_TAG, aws_client_configuration);
LOG_TRACE(logger, "Requesting credentials with AWS_ACCESS_KEY: {}", sso_account_id);
auto result = client->GetSSOCredentials(request);
LOG_TRACE(logger, "Successfully retrieved credentials with AWS_ACCESS_KEY: {}", result.creds.GetAWSAccessKeyId());
credentials = result.creds;
}
void SSOCredentialsProvider::refreshIfExpired()
{
Aws::Utils::Threading::ReaderLockGuard guard(m_reloadLock);
if (!areCredentialsEmptyOrExpired(credentials, expiration_window_seconds))
return;
guard.UpgradeToWriterLock();
if (!areCredentialsEmptyOrExpired(credentials, expiration_window_seconds)) // double-checked lock to avoid refreshing twice
return;
Reload();
}
Aws::String SSOCredentialsProvider::loadAccessTokenFile(const Aws::String & sso_access_token_path)
{
LOG_TRACE(logger, "Preparing to load token from: {}", sso_access_token_path);
Aws::IFStream input_file(sso_access_token_path.c_str());
if (input_file)
{
LOG_TRACE(logger, "Reading content from token file: {}", sso_access_token_path);
Aws::Utils::Json::JsonValue token_doc(input_file);
if (!token_doc.WasParseSuccessful())
{
LOG_TRACE(logger, "Failed to parse token file: {}", sso_access_token_path);
return "";
}
Aws::Utils::Json::JsonView token_view(token_doc);
Aws::String tmp_access_token, expiration_str;
tmp_access_token = token_view.GetString("accessToken");
expiration_str = token_view.GetString("expiresAt");
Aws::Utils::DateTime expiration(expiration_str, Aws::Utils::DateFormat::ISO_8601);
LOG_TRACE(logger, "Token cache file contains accessToken [{}], expiration [{}]", tmp_access_token, expiration_str);
if (tmp_access_token.empty() || !expiration.WasParseSuccessful())
{
LOG_TRACE(logger, R"(The SSO session associated with this profile has expired or is otherwise invalid. To refresh this SSO session run aws sso login with the corresponding profile.)");
LOG_TRACE(
logger,
"Token cache file failed because {}{}",
(tmp_access_token.empty() ? "AccessToken was empty " : ""),
(!expiration.WasParseSuccessful() ? "failed to parse expiration" : ""));
return "";
}
expires_at = expiration;
return tmp_access_token;
}
else
{
LOG_TRACE(logger, "Unable to open token file on path: {}", sso_access_token_path);
return "";
}
}
S3CredentialsProviderChain::S3CredentialsProviderChain(
const DB::S3::PocoHTTPClientConfiguration & configuration,
const Aws::Auth::AWSCredentials & credentials,
@ -494,6 +632,18 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
AddProvider(std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>());
{
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
configuration.region,
configuration.remote_host_filter,
configuration.s3_max_redirects,
configuration.enable_s3_requests_logging,
configuration.for_disk_s3,
configuration.get_request_throttler,
configuration.put_request_throttler);
AddProvider(std::make_shared<SSOCredentialsProvider>(
std::move(aws_client_configuration), credentials_configuration.expiration_window_seconds));
}
/// ECS TaskRole Credentials only available when ENVIRONMENT VARIABLE is set.
const auto relative_uri = Aws::Environment::GetEnv(AWS_ECS_CONTAINER_CREDENTIALS_RELATIVE_URI);

View File

@ -8,6 +8,7 @@
# include <aws/core/internal/AWSHttpResourceClient.h>
# include <aws/core/config/AWSProfileConfigLoader.h>
# include <aws/core/auth/AWSCredentialsProviderChain.h>
# include <aws/core/auth/bearer-token-provider/SSOBearerTokenProvider.h>
# include <IO/S3/PocoHTTPClient.h>
@ -124,6 +125,39 @@ private:
uint64_t expiration_window_seconds;
};
class SSOCredentialsProvider : public Aws::Auth::AWSCredentialsProvider
{
public:
SSOCredentialsProvider(DB::S3::PocoHTTPClientConfiguration aws_client_configuration_, uint64_t expiration_window_seconds_);
Aws::Auth::AWSCredentials GetAWSCredentials() override;
private:
Aws::UniquePtr<Aws::Internal::SSOCredentialsClient> client;
Aws::Auth::AWSCredentials credentials;
// Profile description variables
Aws::String profile_to_use;
// The AWS account ID that temporary AWS credentials are resolved for.
Aws::String sso_account_id;
// The AWS region where the SSO directory for the given sso_start_url is hosted.
// This is independent of the general region configuration and MUST NOT be conflated.
Aws::String sso_region;
// The expiration time of the accessToken.
Aws::Utils::DateTime expires_at;
// The SSO Token Provider
Aws::Auth::SSOBearerTokenProvider bearer_token_provider;
DB::S3::PocoHTTPClientConfiguration aws_client_configuration;
uint64_t expiration_window_seconds;
Poco::Logger * logger;
void Reload() override;
void refreshIfExpired();
Aws::String loadAccessTokenFile(const Aws::String & sso_access_token_path);
};
struct CredentialsConfiguration
{
bool use_environment_credentials = false;

View File

@ -238,8 +238,15 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf)
++num_copied_chars;
}
auto res = fast_float::from_chars(tmp_buf, tmp_buf + num_copied_chars, x);
fast_float::from_chars_result res;
if constexpr (std::endian::native == std::endian::little)
res = fast_float::from_chars(tmp_buf, tmp_buf + num_copied_chars, x);
else
{
Float64 x64 = 0.0;
res = fast_float::from_chars(tmp_buf, tmp_buf + num_copied_chars, x64);
x = static_cast<T>(x64);
}
if (unlikely(res.ec != std::errc()))
{
if constexpr (throw_exception)

View File

@ -2506,6 +2506,96 @@ ActionsDAGPtr ActionsDAG::buildFilterActionsDAG(
return result_dag;
}
ActionsDAG::NodeRawConstPtrs ActionsDAG::extractConjunctionAtoms(const Node * predicate)
{
NodeRawConstPtrs atoms;
std::stack<const ActionsDAG::Node *> stack;
stack.push(predicate);
while (!stack.empty())
{
const auto * node = stack.top();
stack.pop();
if (node->type == ActionsDAG::ActionType::FUNCTION)
{
const auto & name = node->function_base->getName();
if (name == "and")
{
for (const auto * arg : node->children)
stack.push(arg);
continue;
}
}
atoms.push_back(node);
}
return atoms;
}
ActionsDAG::NodeRawConstPtrs ActionsDAG::filterNodesByAllowedInputs(
NodeRawConstPtrs nodes,
const std::unordered_set<const Node *> & allowed_inputs)
{
size_t result_size = 0;
std::unordered_map<const ActionsDAG::Node *, bool> can_compute;
struct Frame
{
const ActionsDAG::Node * node;
size_t next_child_to_visit = 0;
bool can_compute_all_childern = true;
};
std::stack<Frame> stack;
for (const auto * node : nodes)
{
if (!can_compute.contains(node))
stack.push({node});
while (!stack.empty())
{
auto & frame = stack.top();
bool need_visit_child = false;
while (frame.next_child_to_visit < frame.node->children.size())
{
auto it = can_compute.find(frame.node->children[frame.next_child_to_visit]);
if (it == can_compute.end())
{
stack.push({frame.node->children[frame.next_child_to_visit]});
need_visit_child = true;
break;
}
frame.can_compute_all_childern &= it->second;
++frame.next_child_to_visit;
}
if (need_visit_child)
continue;
if (frame.node->type == ActionsDAG::ActionType::INPUT)
can_compute[frame.node] = allowed_inputs.contains(frame.node);
else
can_compute[frame.node] = frame.can_compute_all_childern;
stack.pop();
}
if (can_compute.at(node))
{
nodes[result_size] = node;
++result_size;
}
}
nodes.resize(result_size);
return nodes;
}
FindOriginalNodeForOutputName::FindOriginalNodeForOutputName(const ActionsDAGPtr & actions_)
:actions(actions_)
{

View File

@ -384,6 +384,16 @@ public:
const ContextPtr & context,
bool single_output_condition_node = true);
/// Check if `predicate` is a combination of AND functions.
/// Returns a list of nodes representing atomic predicates.
static NodeRawConstPtrs extractConjunctionAtoms(const Node * predicate);
/// Get a list of nodes. For every node, check if it can be compused using allowed subset of inputs.
/// Returns only those nodes from the list which can be computed.
static NodeRawConstPtrs filterNodesByAllowedInputs(
NodeRawConstPtrs nodes,
const std::unordered_set<const Node *> & allowed_inputs);
private:
NodeRawConstPtrs getParents(const Node * target) const;

View File

@ -345,7 +345,7 @@ Block createBlockForSet(
{
auto get_tuple_type_from_ast = [context](const auto & func) -> DataTypePtr
{
if (func && (func->name == "tuple" || func->name == "array") && !func->arguments->children.empty())
if ((func->name == "tuple" || func->name == "array") && !func->arguments->children.empty())
{
/// Won't parse all values of outer tuple.
auto element = func->arguments->children.at(0);
@ -356,6 +356,7 @@ Block createBlockForSet(
return evaluateConstantExpression(func, context).second;
};
assert(right_arg);
const DataTypePtr & right_arg_type = get_tuple_type_from_ast(right_arg);
size_t left_tuple_depth = getTypeDepth(left_arg_type);

View File

@ -2272,6 +2272,29 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
return block;
}
Block Aggregator::prepareBlockAndFillWithoutKeySnapshot(AggregatedDataVariants & data_variants) const
{
size_t rows = 1;
bool final = true;
auto && out_cols
= prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), data_variants.aggregates_pools, final, rows);
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
AggregatedDataWithoutKey & data = data_variants.without_key;
/// Always single-thread. It's safe to pass current arena from 'aggregates_pool'.
for (size_t insert_i = 0; insert_i < params.aggregates_size; ++insert_i)
aggregate_functions[insert_i]->insertResultInto(
data + offsets_of_aggregate_states[insert_i],
*final_aggregate_columns[insert_i],
data_variants.aggregates_pool);
Block block = finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows);
return block;
}
template <bool return_single_block>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const

View File

@ -1210,6 +1210,7 @@ private:
friend class ConvertingAggregatedToChunksSource;
friend class ConvertingAggregatedToChunksWithMergingSource;
friend class AggregatingInOrderTransform;
friend class AggregatingPartialResultTransform;
/// Data structure of source blocks.
Block header;
@ -1391,6 +1392,7 @@ private:
std::atomic<bool> * is_cancelled = nullptr) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
Block prepareBlockAndFillWithoutKeySnapshot(AggregatedDataVariants & data_variants) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
template <bool return_single_block>

View File

@ -420,8 +420,6 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
if (address.is_local)
info.local_addresses.push_back(address);
info.all_addresses.push_back(address);
auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
address.host_name, address.port,
@ -564,7 +562,6 @@ void Cluster::addShard(const Settings & settings, Addresses && addresses, bool t
ShardInfoInsertPathForInternalReplication && insert_paths, UInt32 weight, bool internal_replication)
{
Addresses shard_local_addresses;
Addresses shard_all_addresses;
ConnectionPoolPtrs all_replicas_pools;
all_replicas_pools.reserve(addresses.size());
@ -582,7 +579,6 @@ void Cluster::addShard(const Settings & settings, Addresses && addresses, bool t
all_replicas_pools.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
shard_local_addresses.push_back(replica);
shard_all_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas_pools, settings.load_balancing,
@ -596,7 +592,6 @@ void Cluster::addShard(const Settings & settings, Addresses && addresses, bool t
current_shard_num,
weight,
std::move(shard_local_addresses),
std::move(shard_all_addresses),
std::move(shard_pool),
std::move(all_replicas_pools),
internal_replication
@ -720,8 +715,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
if (address.is_local)
info.local_addresses.push_back(address);
info.all_addresses.push_back(address);
auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
address.host_name,

View File

@ -217,7 +217,6 @@ public:
UInt32 shard_num = 0;
UInt32 weight = 1;
Addresses local_addresses;
Addresses all_addresses;
/// nullptr if there are no remote addresses
ConnectionPoolWithFailoverPtr pool;
/// Connection pool for each replica, contains nullptr for local replicas

View File

@ -123,7 +123,7 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, /*replica_num=*/0, /*replica_count=*/0, /*coordinator=*/nullptr));
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count));
};
auto emplace_remote_stream = [&](bool lazy = false, time_t local_delay = 0)

View File

@ -156,10 +156,10 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
if (typeid_cast<DatabaseReplicated *>(database.get()))
{
int command_types_count = !mutation_commands.empty() + !partition_commands.empty() + !alter_commands.empty();
bool mixed_settings_amd_metadata_alter = alter_commands.hasSettingsAlterCommand() && !alter_commands.isSettingsAlter();
bool mixed_settings_amd_metadata_alter = alter_commands.hasNonReplicatedAlterCommand() && !alter_commands.areNonReplicatedAlterCommands();
if (1 < command_types_count || mixed_settings_amd_metadata_alter)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "For Replicated databases it's not allowed "
"to execute ALTERs of different types in single query");
"to execute ALTERs of different types (replicated and non replicated) in single query");
}
if (mutation_commands.hasNonEmptyMutationCommands())

View File

@ -526,6 +526,7 @@ bool ParserStorage::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
break;
}
// If any part of storage definition is found create storage node
if (!storage_like)
return false;

View File

@ -14,7 +14,8 @@ namespace ErrorCodes
extern const int POSITION_OUT_OF_BOUND;
}
Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_) : columns(std::move(columns_)), num_rows(num_rows_)
Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_)
: columns(std::move(columns_)), num_rows(num_rows_)
{
checkNumRowsIsConsistent();
}

View File

@ -75,7 +75,7 @@ void CompletedPipelineExecutor::execute()
if (interactive_timeout_ms)
{
data = std::make_unique<Data>();
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
data->executor->setReadProgressCallback(pipeline.getReadProgressCallback());
/// Avoid passing this to lambda, copy ptr to data instead.
@ -105,7 +105,7 @@ void CompletedPipelineExecutor::execute()
}
else
{
PipelineExecutor executor(pipeline.processors, pipeline.process_list_element);
PipelineExecutor executor(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
executor.setReadProgressCallback(pipeline.getReadProgressCallback());
executor.execute(pipeline.getNumThreads(), pipeline.getConcurrencyControl());
}

View File

@ -260,7 +260,6 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
{
pid = updated_processors.top();
updated_processors.pop();
/// In this method we have ownership on node.
auto & node = *nodes[pid];

View File

@ -30,6 +30,12 @@ private:
/// Callback for read progress.
ReadProgressCallback * read_progress_callback = nullptr;
/// Timer that stops optimization of running local tasks instead of queuing them.
/// It provides local progress for each IProcessor task, allowing the partial result of the request to be always sended to the user.
Stopwatch watch;
/// Time period that limits the maximum allowed duration for optimizing the scheduling of local tasks within the executor
const UInt64 partial_result_duration_ms;
public:
#ifndef NDEBUG
/// Time for different processing stages.
@ -62,8 +68,13 @@ public:
void setException(std::exception_ptr exception_) { exception = exception_; }
void rethrowExceptionIfHas();
explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback)
bool needWatchRestartForPartialResultProgress() { return partial_result_duration_ms != 0 && partial_result_duration_ms < watch.elapsedMilliseconds(); }
void restartWatch() { watch.restart(); }
explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback, UInt64 partial_result_duration_ms_)
: read_progress_callback(callback)
, watch(CLOCK_MONOTONIC)
, partial_result_duration_ms(partial_result_duration_ms_)
, thread_number(thread_number_)
, profile_processors(profile_processors_)
, trace_processors(trace_processors_)

View File

@ -108,8 +108,15 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
{
context.setTask(nullptr);
/// Take local task from queue if has one.
if (!queue.empty() && !context.hasAsyncTasks())
/// If sending partial results is allowed and local tasks scheduling optimization is repeated longer than the limit
/// or new task need to send partial result later, skip optimization for this iteration.
/// Otherwise take local task from queue if has one.
if ((!queue.empty() && queue.front()->processor->isPartialResultProcessor())
|| context.needWatchRestartForPartialResultProgress())
{
context.restartWatch();
}
else if (!queue.empty() && !context.hasAsyncTasks())
{
context.setTask(queue.front());
queue.pop();
@ -139,7 +146,7 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
}
}
void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback)
void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback, UInt64 partial_result_duration_ms)
{
num_threads = num_threads_;
use_threads = use_threads_;
@ -151,7 +158,7 @@ void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i, profile_processors, trace_processors, callback));
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i, profile_processors, trace_processors, callback, partial_result_duration_ms));
}
}

View File

@ -58,7 +58,7 @@ public:
void tryGetTask(ExecutionThreadContext & context);
void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);
void init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback);
void init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback, UInt64 partial_result_duration_ms);
void fill(Queue & queue);
void upscale(size_t use_threads_);

View File

@ -33,8 +33,9 @@ namespace ErrorCodes
}
PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem)
PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem, UInt64 partial_result_duration_ms_)
: process_list_element(std::move(elem))
, partial_result_duration_ms(partial_result_duration_ms_)
{
if (process_list_element)
{
@ -328,7 +329,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
Queue queue;
graph->initializeExecution(queue);
tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get());
tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get(), partial_result_duration_ms);
tasks.fill(queue);
if (num_threads > 1)

View File

@ -33,7 +33,7 @@ public:
/// During pipeline execution new processors can appear. They will be added to existing set.
///
/// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem);
explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem, UInt64 partial_result_duration_ms_ = 0);
~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once.
@ -90,6 +90,9 @@ private:
ReadProgressCallbackPtr read_progress_callback;
/// Duration between sending partial result through the pipeline
const UInt64 partial_result_duration_ms;
using Queue = std::queue<ExecutingGraph::Node *>;
void initializeExecution(size_t num_threads, bool concurrency_control); /// Initialize executor contexts and task_queue.

View File

@ -41,12 +41,13 @@ struct PullingAsyncPipelineExecutor::Data
}
};
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool has_partial_result_setting) : pipeline(pipeline_)
{
if (!pipeline.pulling())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PullingAsyncPipelineExecutor must be pulling");
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.output->getHeader());
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.output->getHeader(), /*is_partial_result_protocol_active*/ has_partial_result_setting);
pipeline.complete(lazy_format);
}
@ -103,7 +104,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
if (!data)
{
data = std::make_unique<Data>();
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
data->executor->setReadProgressCallback(pipeline.getReadProgressCallback());
data->lazy_format = lazy_format.get();

View File

@ -21,7 +21,7 @@ struct ProfileInfo;
class PullingAsyncPipelineExecutor
{
public:
explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_);
explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool has_partial_result_setting = false);
~PullingAsyncPipelineExecutor();
/// Get structure of returned block or chunk.

View File

@ -44,7 +44,7 @@ bool PullingPipelineExecutor::pull(Chunk & chunk)
{
if (!executor)
{
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
}

View File

@ -167,7 +167,7 @@ void PushingAsyncPipelineExecutor::start()
started = true;
data = std::make_unique<Data>();
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
data->executor->setReadProgressCallback(pipeline.getReadProgressCallback());
data->source = pushing_source.get();

View File

@ -87,7 +87,7 @@ void PushingPipelineExecutor::start()
return;
started = true;
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
if (!executor->executeStep(&input_wait_flag))

View File

@ -1,40 +1,89 @@
#include <Processors/Formats/IOutputFormat.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
namespace DB
{
IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_)
: IProcessor({header_, header_, header_}, {}), out(out_)
IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_, bool is_partial_result_protocol_active_)
: IProcessor({header_, header_, header_, header_}, {})
, out(out_)
, is_partial_result_protocol_active(is_partial_result_protocol_active_)
{
}
void IOutputFormat::setCurrentChunk(InputPort & input, PortKind kind)
{
current_chunk = input.pull(true);
current_block_kind = kind;
has_input = true;
}
IOutputFormat::Status IOutputFormat::prepareMainAndPartialResult()
{
bool need_data = false;
for (auto kind : {Main, PartialResult})
{
auto & input = getPort(kind);
if (input.isFinished())
continue;
if (kind == PartialResult && main_input_activated)
{
input.close();
continue;
}
input.setNeeded();
need_data = true;
if (!input.hasData())
continue;
setCurrentChunk(input, kind);
return Status::Ready;
}
if (need_data)
return Status::NeedData;
return Status::Finished;
}
IOutputFormat::Status IOutputFormat::prepareTotalsAndExtremes()
{
for (auto kind : {Totals, Extremes})
{
auto & input = getPort(kind);
if (!input.isConnected() || input.isFinished())
continue;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
setCurrentChunk(input, kind);
return Status::Ready;
}
return Status::Finished;
}
IOutputFormat::Status IOutputFormat::prepare()
{
if (has_input)
return Status::Ready;
for (auto kind : {Main, Totals, Extremes})
{
auto & input = getPort(kind);
auto status = prepareMainAndPartialResult();
if (status != Status::Finished)
return status;
if (kind != Main && !input.isConnected())
continue;
if (input.isFinished())
continue;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull(true);
current_block_kind = kind;
has_input = true;
return Status::Ready;
}
status = prepareTotalsAndExtremes();
if (status != Status::Finished)
return status;
finished = true;
@ -83,8 +132,18 @@ void IOutputFormat::work()
case Main:
result_rows += current_chunk.getNumRows();
result_bytes += current_chunk.allocatedBytes();
if (is_partial_result_protocol_active && !main_input_activated && current_chunk.hasRows())
{
/// Sending an empty block signals to the client that partial results are terminated,
/// and only data from the main pipeline will be forwarded.
consume(Chunk(current_chunk.cloneEmptyColumns(), 0));
main_input_activated = true;
}
consume(std::move(current_chunk));
break;
case PartialResult:
consumePartialResult(std::move(current_chunk));
break;
case Totals:
writeSuffixIfNeeded();
if (auto totals = prepareTotals(std::move(current_chunk)))
@ -119,6 +178,15 @@ void IOutputFormat::write(const Block & block)
flush();
}
void IOutputFormat::writePartialResult(const Block & block)
{
writePrefixIfNeeded();
consumePartialResult(Chunk(block.getColumns(), block.rows()));
if (auto_flush)
flush();
}
void IOutputFormat::finalize()
{
if (finalized)

View File

@ -23,9 +23,9 @@ class WriteBuffer;
class IOutputFormat : public IProcessor
{
public:
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
enum PortKind { Main = 0, Totals = 1, Extremes = 2, PartialResult = 3 };
IOutputFormat(const Block & header_, WriteBuffer & out_);
IOutputFormat(const Block & header_, WriteBuffer & out_, bool is_partial_result_protocol_active_ = false);
Status prepare() override;
void work() override;
@ -54,6 +54,7 @@ public:
/// TODO: separate formats and processors.
void write(const Block & block);
void writePartialResult(const Block & block);
void finalize();
@ -118,6 +119,7 @@ protected:
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void consumePartialResult(Chunk) {}
virtual void finalizeImpl() {}
virtual void finalizeBuffers() {}
virtual void writePrefix() {}
@ -166,6 +168,7 @@ protected:
Chunk current_chunk;
PortKind current_block_kind = PortKind::Main;
bool main_input_activated = false;
bool has_input = false;
bool finished = false;
bool finalized = false;
@ -180,9 +183,15 @@ protected:
Statistics statistics;
private:
void setCurrentChunk(InputPort & input, PortKind kind);
IOutputFormat::Status prepareMainAndPartialResult();
IOutputFormat::Status prepareTotalsAndExtremes();
size_t rows_read_before = 0;
bool are_totals_written = false;
bool is_partial_result_protocol_active = false;
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;

View File

@ -134,7 +134,8 @@ void PrettyBlockOutputFormat::write(Chunk chunk, PortKind port_kind)
{
if (total_rows >= format_settings.pretty.max_rows)
{
total_rows += chunk.getNumRows();
if (port_kind != PortKind::PartialResult)
total_rows += chunk.getNumRows();
return;
}
if (mono_block)
@ -315,7 +316,8 @@ void PrettyBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port_kind
}
writeString(bottom_separator_s, out);
total_rows += num_rows;
if (port_kind != PortKind::PartialResult)
total_rows += num_rows;
}
@ -388,6 +390,34 @@ void PrettyBlockOutputFormat::consumeExtremes(Chunk chunk)
write(std::move(chunk), PortKind::Extremes);
}
void PrettyBlockOutputFormat::clearLastLines(size_t lines_number)
{
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define MOVE_TO_PREV_LINE "\033[A"
#define CLEAR_TO_END_OF_LINE "\033[K"
static const char * clear_prev_line = MOVE_TO_PREV_LINE \
CLEAR_TO_END_OF_LINE;
/// Move cursor to the beginning of line
writeCString("\r", out);
for (size_t line = 0; line < lines_number; ++line)
{
writeCString(clear_prev_line, out);
}
}
void PrettyBlockOutputFormat::consumePartialResult(Chunk chunk)
{
if (prev_partial_block_rows > 0)
/// number of rows + header line + footer line
clearLastLines(prev_partial_block_rows + 2);
prev_partial_block_rows = chunk.getNumRows();
write(std::move(chunk), PortKind::PartialResult);
}
void PrettyBlockOutputFormat::writeMonoChunkIfNeeded()
{

View File

@ -28,7 +28,12 @@ protected:
void consumeTotals(Chunk) override;
void consumeExtremes(Chunk) override;
void clearLastLines(size_t lines_number);
void consumePartialResult(Chunk) override;
size_t total_rows = 0;
size_t prev_partial_block_rows = 0;
size_t row_number_width = 7; // "10000. "
const FormatSettings format_settings;
@ -55,6 +60,7 @@ protected:
void resetFormatterImpl() override
{
total_rows = 0;
prev_partial_block_rows = 0;
}
private:

View File

@ -194,7 +194,8 @@ void PrettyCompactBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind po
writeBottom(max_widths);
total_rows += num_rows;
if (port_kind != PortKind::PartialResult)
total_rows += num_rows;
}

View File

@ -14,8 +14,8 @@ class LazyOutputFormat : public IOutputFormat
{
public:
explicit LazyOutputFormat(const Block & header)
: IOutputFormat(header, out), queue(2) {}
explicit LazyOutputFormat(const Block & header, bool is_partial_result_protocol_active = false)
: IOutputFormat(header, out, is_partial_result_protocol_active), queue(2) {}
String getName() const override { return "LazyOutputFormat"; }
@ -49,6 +49,7 @@ protected:
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
void consumePartialResult(Chunk chunk) override { consume(std::move(chunk)); }
private:

View File

@ -40,5 +40,10 @@ std::string IProcessor::statusToName(Status status)
UNREACHABLE();
}
ProcessorPtr IProcessor::getPartialResultProcessorPtr(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms)
{
return current_processor->getPartialResultProcessor(current_processor, partial_result_limit, partial_result_duration_ms);
}
}

View File

@ -164,6 +164,8 @@ public:
static std::string statusToName(Status status);
static ProcessorPtr getPartialResultProcessorPtr(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms);
/** Method 'prepare' is responsible for all cheap ("instantaneous": O(1) of data volume, no wait) calculations.
*
* It may access input and output ports,
@ -235,6 +237,22 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'expandPipeline' is not implemented for {} processor", getName());
}
enum class PartialResultStatus
{
/// Processor currently doesn't support work with the partial result pipeline.
NotSupported,
/// Processor can be skipped in the partial result pipeline.
SkipSupported,
/// Processor creates a light-weight copy of itself in the partial result pipeline.
/// The copy can create snapshots of the original processor or transform small blocks of data in the same way as the original processor
FullSupported,
};
virtual bool isPartialResultProcessor() const { return false; }
virtual PartialResultStatus getPartialResultProcessorSupportStatus() const { return PartialResultStatus::NotSupported; }
/// In case if query was cancelled executor will wait till all processors finish their jobs.
/// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o).
bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); }
@ -369,6 +387,11 @@ public:
protected:
virtual void onCancel() {}
virtual ProcessorPtr getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 /*partial_result_limit*/, UInt64 /*partial_result_duration_ms*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'getPartialResultProcessor' is not implemented for {} processor", getName());
}
private:
/// For:
/// - elapsed_us

View File

@ -1,5 +1,5 @@
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/LimitPartialResultTransform.h>
namespace DB
{
@ -180,7 +180,6 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
return Status::NeedData;
data.current_chunk = input.pull(true);
auto rows = data.current_chunk.getNumRows();
if (rows_before_limit_at_least && !data.input_port_has_counter)
@ -367,5 +366,11 @@ bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort
return true;
}
ProcessorPtr LimitTransform::getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 partial_result_limit, UInt64 partial_result_duration_ms)
{
const auto & header = inputs.front().getHeader();
return std::make_shared<LimitPartialResultTransform>(header, partial_result_limit, partial_result_duration_ms, limit, offset);
}
}

View File

@ -55,6 +55,8 @@ private:
ColumnRawPtrs extractSortColumns(const Columns & columns) const;
bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const;
ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override;
public:
LimitTransform(
const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams = 1,
@ -73,6 +75,8 @@ public:
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
void setInputPortHasCounter(size_t pos) { ports_data[pos].input_port_has_counter = true; }
PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; }
};
}

View File

@ -21,10 +21,14 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool cleanup_)
bool cleanup_,
size_t * cleanedup_rows_count_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes), cleanup(cleanup_)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes)
, cleanup(cleanup_)
, cleanedup_rows_count(cleanedup_rows_count_)
{
if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
if (!version_column.empty())
@ -74,10 +78,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// Write the data for the previous primary key.
if (!selected_row.empty())
{
if (is_deleted_column_number!=-1)
if (is_deleted_column_number != -1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanedup_rows_count != nullptr)
*cleanedup_rows_count += current_row_sources.size();
}
else
insertRow();
@ -91,7 +98,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
if ((is_deleted_column_number!=-1))
if (is_deleted_column_number != -1)
{
const UInt8 is_deleted = assert_cast<const ColumnUInt8 &>(*current->all_columns[is_deleted_column_number]).getData()[current->getRow()];
if ((is_deleted != 1) && (is_deleted != 0))
@ -129,10 +136,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// We will write the data for the last primary key.
if (!selected_row.empty())
{
if (is_deleted_column_number!=-1)
if (is_deleted_column_number != -1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanedup_rows_count != nullptr)
*cleanedup_rows_count += current_row_sources.size();
}
else
insertRow();

View File

@ -27,7 +27,8 @@ public:
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false);
bool cleanup = false,
size_t * cleanedup_rows_count = nullptr);
Status merge() override;
@ -37,6 +38,7 @@ private:
ssize_t is_deleted_column_number = -1;
ssize_t version_column_number = -1;
bool cleanup = false;
size_t * cleanedup_rows_count = nullptr;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 2; /// last, current.

View File

@ -19,7 +19,8 @@ public:
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false)
bool cleanup = false,
size_t * cleanedup_rows_count = nullptr)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
@ -31,7 +32,8 @@ public:
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes,
cleanup)
cleanup,
cleanedup_rows_count)
{
}

View File

@ -9,7 +9,12 @@ namespace DB
BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from)
{
BuildQueryPipelineSettings settings;
settings.actions_settings = ExpressionActionsSettings::fromSettings(from->getSettingsRef(), CompileExpressions::yes);
const auto & context_settings = from->getSettingsRef();
settings.partial_result_limit = context_settings.max_rows_in_partial_result;
settings.partial_result_duration_ms = context_settings.partial_result_update_duration_ms.totalMilliseconds();
settings.actions_settings = ExpressionActionsSettings::fromSettings(context_settings, CompileExpressions::yes);
settings.process_list_element = from->getProcessListElement();
settings.progress_callback = from->getProgressCallback();
return settings;

View File

@ -19,6 +19,9 @@ struct BuildQueryPipelineSettings
QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr;
UInt64 partial_result_limit = 0;
UInt64 partial_result_duration_ms = 0;
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
static BuildQueryPipelineSettings fromContext(ContextPtr from);
};

View File

@ -44,11 +44,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t shard_num,
size_t shard_count,
size_t replica_num,
size_t replica_count,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
UUID group_uuid)
size_t shard_count)
{
checkStackSize();
@ -67,26 +63,6 @@ std::unique_ptr<QueryPlan> createLocalPlan(
.setShardInfo(static_cast<UInt32>(shard_num), static_cast<UInt32>(shard_count))
.ignoreASTOptimizations();
/// There are much things that are needed for coordination
/// during reading with parallel replicas
if (coordinator)
{
new_context->parallel_reading_coordinator = coordinator;
new_context->setClientInterface(ClientInfo::Interface::LOCAL);
new_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
new_context->setReplicaInfo(true, replica_count, replica_num);
new_context->setConnectionClientVersion(VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH, DBMS_TCP_PROTOCOL_VERSION);
new_context->setParallelReplicasGroupUUID(group_uuid);
new_context->setMergeTreeAllRangesCallback([coordinator](InitialAllRangesAnnouncement announcement)
{
coordinator->handleInitialAllRangesAnnouncement(announcement);
});
new_context->setMergeTreeReadTaskCallback([coordinator](ParallelReadRequest request) -> std::optional<ParallelReadResponse>
{
return coordinator->handleRequest(request);
});
}
if (context->getSettingsRef().allow_experimental_analyzer)
{
auto interpreter = InterpreterSelectQueryAnalyzer(query_ast, new_context, select_query_options);

View File

@ -19,10 +19,5 @@ std::unique_ptr<QueryPlan> createLocalPlan(
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t shard_num,
size_t shard_count,
size_t replica_num,
size_t replica_count,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
UUID group_uuid = UUIDHelpers::Nil);
size_t shard_count);
}

View File

@ -168,6 +168,8 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
QueryPipelineBuilderPtr last_pipeline;
bool has_partial_result_setting = build_pipeline_settings.partial_result_duration_ms > 0;
std::stack<Frame> stack;
stack.push(Frame{.node = root});
@ -194,6 +196,9 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
if (has_partial_result_setting && last_pipeline && !last_pipeline->isPartialResultActive())
last_pipeline->activatePartialResult(build_pipeline_settings.partial_result_limit, build_pipeline_settings.partial_result_duration_ms);
}
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);

View File

@ -1226,6 +1226,7 @@ static void buildIndexes(
std::optional<ReadFromMergeTree::Indexes> & indexes,
ActionsDAGPtr filter_actions_dag,
const MergeTreeData & data,
const MergeTreeData::DataPartsVector & parts,
const ContextPtr & context,
const SelectQueryInfo & query_info,
const StorageMetadataPtr & metadata_snapshot)
@ -1248,7 +1249,7 @@ static void buildIndexes(
context,
primary_key_column_names,
primary_key.expression,
array_join_name_set}, {}, {}, {}, false});
array_join_name_set}, {}, {}, {}, false, {}});
}
else
{
@ -1256,7 +1257,7 @@ static void buildIndexes(
query_info,
context,
primary_key_column_names,
primary_key.expression}, {}, {}, {}, false});
primary_key.expression}, {}, {}, {}, false, {}});
}
if (metadata_snapshot->hasPartitionKey())
@ -1269,6 +1270,9 @@ static void buildIndexes(
indexes->partition_pruner.emplace(metadata_snapshot, filter_actions_dag, context, false /* strict */);
}
/// TODO Support row_policy_filter and additional_filters
indexes->part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, filter_actions_dag, context);
indexes->use_skip_indexes = settings.use_skip_indexes;
bool final = query_info.isFinal();
@ -1346,7 +1350,7 @@ static void buildIndexes(
void ReadFromMergeTree::applyFilters()
{
auto filter_actions_dag = buildFilterDAG(context, prewhere_info, filter_nodes, query_info);
buildIndexes(indexes, filter_actions_dag, data, context, query_info, metadata_for_reading);
buildIndexes(indexes, filter_actions_dag, data, prepared_parts, context, query_info, metadata_for_reading);
}
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
@ -1424,11 +1428,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
size_t total_parts = parts.size();
/// TODO Support row_policy_filter and additional_filters
auto part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, query_info.query, context);
if (part_values && part_values->empty())
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
result.column_names_to_read = real_column_names;
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
@ -1443,7 +1442,10 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
const Names & primary_key_column_names = primary_key.column_names;
if (!indexes)
buildIndexes(indexes, query_info.filter_actions_dag, data, context, query_info, metadata_snapshot);
buildIndexes(indexes, query_info.filter_actions_dag, data, parts, context, query_info, metadata_snapshot);
if (indexes->part_values && indexes->part_values->empty())
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
if (settings.force_primary_key && indexes->key_condition.alwaysUnknownOrTrue())
{
@ -1467,7 +1469,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
indexes->minmax_idx_condition,
parts,
alter_conversions,
part_values,
indexes->part_values,
metadata_snapshot_base,
data,
context,

View File

@ -171,6 +171,7 @@ public:
std::optional<KeyCondition> minmax_idx_condition;
UsefulSkipIndexes skip_indexes;
bool use_skip_indexes;
std::optional<std::unordered_set<String>> part_values;
};
static MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(

View File

@ -187,7 +187,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
if (try_results.empty() || local_delay < max_remote_delay)
{
auto plan = createLocalPlan(
query, header, my_context, my_stage, my_shard.shard_info.shard_num, my_shard_count, 0, 0, /*coordinator=*/nullptr);
query, header, my_context, my_stage, my_shard.shard_info.shard_num, my_shard_count);
return std::move(*plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(my_context),
@ -245,6 +245,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
LOG_INFO(log, "cluster_for_parallel_replicas has been set for the query but has no effect: {}. Distributed table cluster is used: {}",
cluster_for_parallel_replicas, cluster_name);
}
LOG_TRACE(&Poco::Logger::get("ReadFromRemote"), "Setting `cluster_for_parallel_replicas` to {}", cluster_name);
context->setSetting("cluster_for_parallel_replicas", cluster_name);
}

View File

@ -57,7 +57,7 @@ private:
std::shared_ptr<const StorageLimitsList> storage_limits;
Poco::Logger * log;
UInt32 shard_count;
String cluster_name;
const String cluster_name;
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);

View File

@ -27,6 +27,8 @@ public:
size_t max_bytes_before_external_sort = 0;
TemporaryDataOnDiskScopePtr tmp_data = nullptr;
size_t min_free_disk_space = 0;
UInt64 partial_result_limit = 0;
UInt64 partial_result_duration_ms = 0;
explicit Settings(const Context & context);
explicit Settings(size_t max_block_size_);

View File

@ -0,0 +1,47 @@
#include <Processors/Transforms/AggregatingPartialResultTransform.h>
namespace DB
{
AggregatingPartialResultTransform::AggregatingPartialResultTransform(
const Block & input_header, const Block & output_header, AggregatingTransformPtr aggregating_transform_,
UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_)
: PartialResultTransform(input_header, output_header, partial_result_limit_, partial_result_duration_ms_)
, aggregating_transform(std::move(aggregating_transform_))
, transform_aggregator(input_header, aggregating_transform->params->params)
{}
void AggregatingPartialResultTransform::transformPartialResult(Chunk & chunk)
{
auto & params = aggregating_transform->params->params;
bool no_more_keys = false;
AggregatedDataVariants variants;
ColumnRawPtrs key_columns(params.keys_size);
Aggregator::AggregateColumns aggregate_columns(params.aggregates_size);
const UInt64 num_rows = chunk.getNumRows();
transform_aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys);
auto transformed_block = transform_aggregator.convertToBlocks(variants, /*final*/ true, /*max_threads*/ 1).front();
chunk = convertToChunk(transformed_block);
}
PartialResultTransform::ShaphotResult AggregatingPartialResultTransform::getRealProcessorSnapshot()
{
std::lock_guard lock(aggregating_transform->snapshot_mutex);
if (aggregating_transform->is_generate_initialized)
return {{}, SnaphotStatus::Stopped};
if (aggregating_transform->variants.empty())
return {{}, SnaphotStatus::NotReady};
auto & snapshot_aggregator = aggregating_transform->params->aggregator;
auto & snapshot_variants = aggregating_transform->many_data->variants;
auto block = snapshot_aggregator.prepareBlockAndFillWithoutKeySnapshot(*snapshot_variants.at(0));
return {convertToChunk(block), SnaphotStatus::Ready};
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Interpreters/Aggregator.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/PartialResultTransform.h>
namespace DB
{
class AggregatingPartialResultTransform : public PartialResultTransform
{
public:
using AggregatingTransformPtr = std::shared_ptr<AggregatingTransform>;
AggregatingPartialResultTransform(
const Block & input_header, const Block & output_header, AggregatingTransformPtr aggregating_transform_,
UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_);
String getName() const override { return "AggregatingPartialResultTransform"; }
void transformPartialResult(Chunk & chunk) override;
ShaphotResult getRealProcessorSnapshot() override;
private:
AggregatingTransformPtr aggregating_transform;
Aggregator transform_aggregator;
};
}

View File

@ -1,3 +1,4 @@
#include <Processors/Transforms/AggregatingPartialResultTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Formats/NativeReader.h>
@ -657,6 +658,8 @@ void AggregatingTransform::consume(Chunk chunk)
src_rows += num_rows;
src_bytes += chunk.bytes();
std::lock_guard lock(snapshot_mutex);
if (params->params.only_merge)
{
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
@ -676,6 +679,7 @@ void AggregatingTransform::initGenerate()
if (is_generate_initialized)
return;
std::lock_guard lock(snapshot_mutex);
is_generate_initialized = true;
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
@ -806,4 +810,12 @@ void AggregatingTransform::initGenerate()
}
}
ProcessorPtr AggregatingTransform::getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms)
{
const auto & input_header = inputs.front().getHeader();
const auto & output_header = outputs.front().getHeader();
auto aggregating_processor = std::dynamic_pointer_cast<AggregatingTransform>(current_processor);
return std::make_shared<AggregatingPartialResultTransform>(input_header, output_header, std::move(aggregating_processor), partial_result_limit, partial_result_duration_ms);
}
}

View File

@ -170,9 +170,23 @@ public:
void work() override;
Processors expandPipeline() override;
PartialResultStatus getPartialResultProcessorSupportStatus() const override
{
/// Currently AggregatingPartialResultTransform support only single-thread aggregation without key.
/// TODO: check that insert results from aggregator.prepareBlockAndFillWithoutKey return values without
/// changing of the aggregator state when aggregation with keys will be supported in AggregatingPartialResultTransform.
bool is_partial_result_supported = params->params.keys_size == 0 /// Aggregation without key.
&& many_data->variants.size() == 1; /// Use only one stream for aggregation.
return is_partial_result_supported ? PartialResultStatus::FullSupported : PartialResultStatus::NotSupported;
}
protected:
void consume(Chunk chunk);
ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override;
private:
/// To read the data that was flushed into the temporary data file.
Processors processors;
@ -212,6 +226,13 @@ private:
bool is_consume_started = false;
friend class AggregatingPartialResultTransform;
/// The mutex protects variables that are used for creating a snapshot of the current processor.
/// The current implementation of AggregatingPartialResultTransform uses the 'is_generate_initialized' variable to check
/// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots.
/// Additionally, the mutex protects the 'params->aggregator' and 'many_data->variants' variables, which are used to get data from them for a snapshot.
std::mutex snapshot_mutex;
void initGenerate();
};

View File

@ -25,6 +25,12 @@ void ExpressionTransform::transform(Chunk & chunk)
chunk.setColumns(block.getColumns(), num_rows);
}
ProcessorPtr ExpressionTransform::getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 /*partial_result_limit*/, UInt64 /*partial_result_duration_ms*/)
{
const auto & header = getInputPort().getHeader();
return std::make_shared<ExpressionTransform>(header, expression);
}
ConvertingTransform::ConvertingTransform(const Block & header_, ExpressionActionsPtr expression_)
: ExceptionKeepingTransform(header_, ExpressionTransform::transformHeader(header_, expression_->getActionsDAG()))
, expression(std::move(expression_))

View File

@ -26,10 +26,15 @@ public:
static Block transformHeader(Block header, const ActionsDAG & expression);
PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; }
protected:
void transform(Chunk & chunk) override;
ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override;
private:
ExpressionActionsPtr expression;
};

View File

@ -0,0 +1,42 @@
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/LimitPartialResultTransform.h>
namespace DB
{
LimitPartialResultTransform::LimitPartialResultTransform(
const Block & header,
UInt64 partial_result_limit_,
UInt64 partial_result_duration_ms_,
UInt64 limit_,
UInt64 offset_)
: PartialResultTransform(header, partial_result_limit_, partial_result_duration_ms_)
, limit(limit_)
, offset(offset_)
{}
void LimitPartialResultTransform::transformPartialResult(Chunk & chunk)
{
UInt64 num_rows = chunk.getNumRows();
if (num_rows < offset || limit == 0)
{
chunk = {};
return;
}
UInt64 length = std::min(limit, num_rows - offset);
/// Check if some rows should be removed
if (length < num_rows)
{
UInt64 num_columns = chunk.getNumColumns();
auto columns = chunk.detachColumns();
for (UInt64 i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(offset, length);
chunk.setColumns(std::move(columns), length);
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Processors/Transforms/PartialResultTransform.h>
namespace DB
{
class LimitTransform;
/// Currently support only single thread implementation with one input and one output ports
class LimitPartialResultTransform : public PartialResultTransform
{
public:
using LimitTransformPtr = std::shared_ptr<LimitTransform>;
LimitPartialResultTransform(
const Block & header,
UInt64 partial_result_limit_,
UInt64 partial_result_duration_ms_,
UInt64 limit_,
UInt64 offset_);
String getName() const override { return "LimitPartialResultTransform"; }
void transformPartialResult(Chunk & chunk) override;
/// LimitsTransform doesn't have a state which can be snapshoted
ShaphotResult getRealProcessorSnapshot() override { return {{}, SnaphotStatus::Stopped}; }
private:
UInt64 limit;
UInt64 offset;
LimitTransformPtr limit_transform;
};
}

View File

@ -1,4 +1,5 @@
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/PartialResultTransform.h>
#include <Access/EnabledQuota.h>
namespace DB

View File

@ -33,6 +33,8 @@ public:
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) { quota = quota_; }
PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::SkipSupported; }
protected:
void transform(Chunk & chunk) override;

Some files were not shown because too many files have changed in this diff Show More