Merge branch 'master' into harmful

This commit is contained in:
Alexey Milovidov 2021-01-04 04:06:46 +03:00
commit 38d982b199
163 changed files with 3007 additions and 1663 deletions

View File

@ -68,7 +68,15 @@ include (cmake/find/ccache.cmake)
option(ENABLE_CHECK_HEAVY_BUILDS "Don't allow C++ translation units to compile too long or to take too much memory while compiling" OFF)
if (ENABLE_CHECK_HEAVY_BUILDS)
set (CMAKE_CXX_COMPILER_LAUNCHER prlimit --rss=10000000 --cpu=600)
# set DATA (since RSS does not work since 2.6.x+) to 2G
set (RLIMIT_DATA 5000000000)
# set VIRT (RLIMIT_AS) to 10G (DATA*10)
set (RLIMIT_AS 10000000000)
# gcc10/gcc10/clang -fsanitize=memory is too heavy
if (SANITIZE STREQUAL "memory" OR COMPILER_GCC)
set (RLIMIT_DATA 10000000000)
endif()
set (CMAKE_CXX_COMPILER_LAUNCHER prlimit --as=${RLIMIT_AS} --data=${RLIMIT_DATA} --cpu=600)
endif ()
if (NOT CMAKE_BUILD_TYPE OR CMAKE_BUILD_TYPE STREQUAL "None")
@ -187,13 +195,14 @@ endif ()
option(ADD_GDB_INDEX_FOR_GOLD "Add .gdb-index to resulting binaries for gold linker.")
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
if (LINKER_NAME STREQUAL "lld")
# Can be lld or ld-lld.
if (LINKER_NAME MATCHES "lld$")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gdb-index")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--gdb-index")
message (STATUS "Adding .gdb-index via --gdb-index linker option.")
# we use another tool for gdb-index, because gold linker removes section .debug_aranges, which used inside clickhouse stacktraces
# http://sourceware-org.1504.n7.nabble.com/gold-No-debug-aranges-section-when-linking-with-gdb-index-td540965.html#a556932
elseif (LINKER_NAME STREQUAL "gold" AND ADD_GDB_INDEX_FOR_GOLD)
elseif (LINKER_NAME MATCHES "gold$" AND ADD_GDB_INDEX_FOR_GOLD)
find_program (GDB_ADD_INDEX_EXE NAMES "gdb-add-index" DOC "Path to gdb-add-index executable")
if (NOT GDB_ADD_INDEX_EXE)
set (USE_GDB_ADD_INDEX 0)

View File

@ -31,7 +31,6 @@ set_property(DIRECTORY PROPERTY EXCLUDE_FROM_ALL 1)
add_subdirectory (antlr4-runtime-cmake)
add_subdirectory (boost-cmake)
add_subdirectory (cctz-cmake)
add_subdirectory (consistent-hashing-sumbur)
add_subdirectory (consistent-hashing)
add_subdirectory (dragonbox-cmake)
add_subdirectory (FastMemcpy)

View File

@ -1,2 +0,0 @@
add_library(consistent-hashing-sumbur sumbur.cpp)
target_include_directories(consistent-hashing-sumbur PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@ -1,113 +0,0 @@
//Copyright (c) 2011-2012 Mail.RU
//Copyright (c) 2011-2012 Maksim Kalinchenko
//Copyright (c) 2012 Sokolov Yura aka funny-falcon
//
//MIT License
//
//Permission is hereby granted, free of charge, to any person obtaining
//a copy of this software and associated documentation files (the
//"Software"), to deal in the Software without restriction, including
//without limitation the rights to use, copy, modify, merge, publish,
//distribute, sublicense, and/or sell copies of the Software, and to
//permit persons to whom the Software is furnished to do so, subject to
//the following conditions:
//
//The above copyright notice and this permission notice shall be
//included in all copies or substantial portions of the Software.
//
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
//MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
//LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
//OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
//WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <stdexcept>
#define L 0xFFFFFFFF
static unsigned int L27_38[] = {L / 27, L / 28, L / 29, L / 30, L / 31, L / 32,
L / 33, L / 34, L / 35, L / 36, L / 37, L / 38,
L / 39, L / 40, L / 41, L / 42, L / 43, L / 44,
L / 45, L / 46, L / 47, L / 48, L / 49, L / 50,
L / 51, L / 52, L / 53, L / 54, L / 55, L / 56,
L / 57, L / 58, L / 59, L / 60, L / 61, L / 62
};
static unsigned int LL27_38[] = {L/(26*27), L/(27*28), L/(28*29), L/(29*30), L/(30*31), L/(31*32),
L/(32*33), L/(33*34), L/(34*35), L/(35*36), L/(36*37), L/(37*38),
L/(38*39), L/(39*40), L/(40*41), L/(41*42), L/(42*43), L/(43*44),
L/(44*45), L/(45*46), L/(46*47), L/(47*48), L/(48*49), L/(49*50),
L/(50*51), L/(51*52), L/(52*53), L/(53*54), L/(54*55), L/(55*56),
L/(56*57), L/(57*58), L/(58*59), L/(59*60), L/(60*61), L/(61*62)
};
unsigned int sumburConsistentHash(unsigned int hashed_int, unsigned int capacity)
{
unsigned int h = hashed_int;
unsigned int capa = capacity;
unsigned int part, n, i, c;
if (capa == 0)
throw std::runtime_error("Sumbur is not applicable to empty cluster");
part = L / capa;
if (L - h < part) return 0;
n = 1;
do {
if (h >= L / 2) h -= L / 2;
else {
n = 2;
if (L / 2 - h < part) return 1;
}
if (capa == 2) return 1;
#define curslice(i) (L / (i * (i - 1)))
#define unroll(i) \
if (curslice(i) <= h) h -= curslice(i); \
else { \
h += curslice(i) * (i - n - 1); \
n = i; \
if (L / i - h < part) return n-1; \
} \
if (capa == i) return (n-1)
unroll(3); unroll(4); unroll(5);
unroll(6); unroll(7); unroll(8);
unroll(9); unroll(10); unroll(11);
unroll(12); unroll(13); unroll(14);
unroll(15); unroll(16); unroll(17);
unroll(18); unroll(19); unroll(20);
unroll(21); unroll(22); unroll(23);
unroll(24); unroll(25); unroll(26);
for (i = 27; i <= capa && i <= 62; i++) {
c = LL27_38[i-27];
if (c <= h) {
h -= c;
}
else {
h += c * (i - n - 1);
n = i;
if (L27_38[i-27] - h < part) return n-1;
}
}
for(i = 63; i <= capa; i++) {
c = L / (i * (i - 1));
if (c <= h) {
h -= c;
}
else {
h += c * (i - n - 1);
n = i;
if (L / i - h < part) return n - 1;
}
}
} while(false);
return n - 1;
}

View File

@ -1,28 +0,0 @@
//Copyright (c) 2011-2012 Mail.RU
//Copyright (c) 2011-2012 Maksim Kalinchenko
//Copyright (c) 2012 Sokolov Yura aka funny-falcon
//
//MIT License
//
//Permission is hereby granted, free of charge, to any person obtaining
//a copy of this software and associated documentation files (the
//"Software"), to deal in the Software without restriction, including
//without limitation the rights to use, copy, modify, merge, publish,
//distribute, sublicense, and/or sell copies of the Software, and to
//permit persons to whom the Software is furnished to do so, subject to
//the following conditions:
//
//The above copyright notice and this permission notice shall be
//included in all copies or substantial portions of the Software.
//
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
//MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
//LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
//OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
//WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
/// Source code: https://github.com/mailru/sumbur-ruby/blob/master/ext/sumbur/sumbur.c
unsigned int sumburConsistentHash(unsigned int hashed_int, unsigned int capacity);

View File

@ -31,7 +31,7 @@ find . -name '*.so.*' -print -exec mv '{}' /output \;
if [ "performance" == "$COMBINED_OUTPUT" ]
then
cp -r ../tests/performance /output
cp -r ../tests/config/top_level_domains /
cp -r ../tests/config/top_level_domains /output
cp -r ../docker/test/performance-comparison/config /output ||:
rm /output/unit_tests_dbms ||:
rm /output/clickhouse-odbc-bridge ||:

View File

@ -36,6 +36,22 @@ function wait_for_server # port, pid
fi
}
function left_or_right()
{
local from=$1 && shift
local basename=$1 && shift
if [ -e "$from/$basename" ]; then
echo "$from/$basename"
return
fi
case "$from" in
left) echo "right/$basename" ;;
right) echo "left/$basename" ;;
esac
}
function configure
{
# Use the new config for both servers, so that we can change it in a PR.
@ -55,7 +71,7 @@ function configure
# server *config* directives overrides
--path db0
--user_files_path db0/user_files
--top_level_domains_path /top_level_domains
--top_level_domains_path "$(left_or_right right top_level_domains)"
--tcp_port $LEFT_SERVER_PORT
)
left/clickhouse-server "${setup_left_server_opts[@]}" &> setup-server-log.log &
@ -103,7 +119,7 @@ function restart
# server *config* directives overrides
--path left/db
--user_files_path left/db/user_files
--top_level_domains_path /top_level_domains
--top_level_domains_path "$(left_or_right left top_level_domains)"
--tcp_port $LEFT_SERVER_PORT
)
left/clickhouse-server "${left_server_opts[@]}" &>> left-server-log.log &
@ -118,7 +134,7 @@ function restart
# server *config* directives overrides
--path right/db
--user_files_path right/db/user_files
--top_level_domains_path /top_level_domains
--top_level_domains_path "$(left_or_right right top_level_domains)"
--tcp_port $RIGHT_SERVER_PORT
)
right/clickhouse-server "${right_server_opts[@]}" &>> right-server-log.log &

View File

@ -23,9 +23,28 @@ $ sudo apt-get install git cmake python ninja-build
Or cmake3 instead of cmake on older systems.
### Install clang-11 (recommended) {#install-clang-11}
On Ubuntu/Debian you can use the automatic installation script (check [official webpage](https://apt.llvm.org/))
```bash
sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
```
For other Linux distribution - check the availability of the [prebuild packages](https://releases.llvm.org/download.html) or build clang [from sources](https://clang.llvm.org/get_started.html).
#### Use clang-11 for Builds {#use-gcc-10-for-builds}
``` bash
$ export CC=clang-11
$ export CXX=clang++-11
```
### Install GCC 10 {#install-gcc-10}
There are several ways to do this.
We recommend building ClickHouse with clang-11, GCC-10 also supported, but it is not used for production builds.
If you want to use GCC-10 there are several ways to install it.
#### Install from Repository {#install-from-repository}
@ -49,7 +68,7 @@ $ sudo apt-get install gcc-10 g++-10
See [utils/ci/build-gcc-from-sources.sh](https://github.com/ClickHouse/ClickHouse/blob/master/utils/ci/build-gcc-from-sources.sh)
### Use GCC 10 for Builds {#use-gcc-10-for-builds}
#### Use GCC 10 for Builds {#use-gcc-10-for-builds}
``` bash
$ export CC=gcc-10

View File

@ -576,6 +576,35 @@ For more information, see the MergeTreeSettings.h header file.
</merge_tree>
```
## metric_log {#metric_log}
It is enabled by default. If it`s not, you can do this manually.
**Enabling**
To manually turn on metrics history collection [`system.metric_log`](../../operations/system-tables/metric_log.md), create `/etc/clickhouse-server/config.d/metric_log.xml` with the following content:
``` xml
<yandex>
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
</yandex>
```
**Disabling**
To disable `metric_log` setting, you should create the following file `/etc/clickhouse-server/config.d/disable_metric_log.xml` with the following content:
``` xml
<yandex>
<metric_log remove="1" />
</yandex>
```
## replicated_merge_tree {#server_configuration_parameters-replicated_merge_tree}
Fine tuning for tables in the [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/mergetree.md).

View File

@ -2,19 +2,6 @@
Contains history of metrics values from tables `system.metrics` and `system.events`, periodically flushed to disk.
To turn on metrics history collection on `system.metric_log`, create `/etc/clickhouse-server/config.d/metric_log.xml` with following content:
``` xml
<yandex>
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
</yandex>
```
Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
@ -55,6 +42,7 @@ CurrentMetric_DistributedFilesToInsert: 0
**See also**
- [metric_log setting](../../operations/server-configuration-parameters/settings.md#metric_log) — Enabling and disabling the setting.
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md) — Contains periodically calculated metrics.
- [system.events](../../operations/system-tables/events.md#system_tables-events) — Contains a number of events that occurred.
- [system.metrics](../../operations/system-tables/metrics.md) — Contains instantly calculated metrics.

View File

@ -182,6 +182,14 @@ If `NULL` is passed to the function as input, then it returns the `Nullable(Noth
Gets the size of the block.
In ClickHouse, queries are always run on blocks (sets of column parts). This function allows getting the size of the block that you called it for.
## byteSize(...) {#function-bytesize}
Get an estimate of uncompressed byte size of its arguments in memory.
E.g. for UInt32 argument it will return constant 4, for String argument - the string length + 9 (terminating zero + length).
The function can take multiple arguments. The typical application is byteSize(*).
Use case: Suppose you have a service that stores data for multiple clients in one table. Users will pay per data volume. So, you need to implement accounting of users data volume. The function will allow to calculate the data size on per-row basis.
## materialize(x) {#materializex}
Turns a constant into a full column containing just one value.

View File

@ -5,10 +5,12 @@ toc_title: mysql
# mysql {#mysql}
Allows `SELECT` queries to be performed on data that is stored on a remote MySQL server.
Allows `SELECT` and `INSERT` queries to be performed on data that is stored on a remote MySQL server.
**Syntax**
``` sql
mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
```
**Parameters**
@ -23,13 +25,15 @@ mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_
- `password` — User password.
- `replace_query` — Flag that converts `INSERT INTO` queries to `REPLACE INTO`. If `replace_query=1`, the query is replaced.
- `replace_query` — Flag that converts `INSERT INTO` queries to `REPLACE INTO`. Possible values:
- `0` - The query is executed as `INSERT INTO`.
- `1` - The query is executed as `REPLACE INTO`.
- `on_duplicate_clause` — The `ON DUPLICATE KEY on_duplicate_clause` expression that is added to the `INSERT` query.
- `on_duplicate_clause` — The `ON DUPLICATE KEY on_duplicate_clause` expression that is added to the `INSERT` query. Can be specified only with `replace_query = 0` (if you simultaneously pass `replace_query = 1` and `on_duplicate_clause`, ClickHouse generates an exception).
Example: `INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1`, where `on_duplicate_clause` is `UPDATE c2 = c2 + 1`. See the MySQL documentation to find which `on_duplicate_clause` you can use with the `ON DUPLICATE KEY` clause.
Example: `INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1;`
To specify `on_duplicate_clause` you need to pass `0` to the `replace_query` parameter. If you simultaneously pass `replace_query = 1` and `on_duplicate_clause`, ClickHouse generates an exception.
`on_duplicate_clause` here is `UPDATE c2 = c2 + 1`. See the MySQL documentation to find which `on_duplicate_clause` you can use with the `ON DUPLICATE KEY` clause.
Simple `WHERE` clauses such as `=, !=, >, >=, <, <=` are currently executed on the MySQL server.
@ -39,46 +43,59 @@ The rest of the conditions and the `LIMIT` sampling constraint are executed in C
A table object with the same columns as the original MySQL table.
## Usage Example {#usage-example}
!!! info "Note"
In the `INSERT` query to distinguish table function `mysql(...)` from table name with column names list you must use keywords `FUNCTION` or `TABLE FUNCTION`. See examples below.
**Examples**
Table in MySQL:
``` text
mysql> CREATE TABLE `test`.`test` (
-> `int_id` INT NOT NULL AUTO_INCREMENT,
-> `int_nullable` INT NULL DEFAULT NULL,
-> `float` FLOAT NOT NULL,
-> `float_nullable` FLOAT NULL DEFAULT NULL,
-> PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)
mysql> insert into test (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)
mysql> INSERT INTO test (`int_id`, `float`) VALUES (1,2);
mysql> select * from test;
+------+----------+-----+----------+
| int_id | int_nullable | float | float_nullable |
+------+----------+-----+----------+
| 1 | NULL | 2 | NULL |
+------+----------+-----+----------+
1 row in set (0,00 sec)
mysql> SELECT * FROM test;
+--------+-------+
| int_id | float |
+--------+-------+
| 1 | 2 |
+--------+-------+
```
Selecting data from ClickHouse:
``` sql
SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123')
SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123');
```
``` text
┌─int_id─┬─int_nullable─┬─float─┬─float_nullable─┐
│ 1 │ ᴺᵁᴸᴸ │ 2 │ ᴺᵁᴸᴸ │
└────────┴──────────────┴───────┴────────────────
┌─int_id─┬─float─┐
│ 1 │ 2 │
└────────┴───────┘
```
## See Also {#see-also}
Replacing and inserting:
```sql
INSERT INTO FUNCTION mysql('localhost:3306', 'test', 'test', 'bayonet', '123', 1) (int_id, float) VALUES (1, 3);
INSERT INTO TABLE FUNCTION mysql('localhost:3306', 'test', 'test', 'bayonet', '123', 0, 'UPDATE int_id = int_id + 1') (int_id, float) VALUES (1, 4);
SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123');
```
``` text
┌─int_id─┬─float─┐
│ 1 │ 3 │
│ 2 │ 4 │
└────────┴───────┘
```
**See Also**
- [The MySQL table engine](../../engines/table-engines/integrations/mysql.md)
- [Using MySQL as a source of external dictionary](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md#dicts-external_dicts_dict_sources-mysql)
[Original article](https://clickhouse.tech/docs/en/query_language/table_functions/mysql/) <!--hide-->
[Original article](https://clickhouse.tech/docs/en/sql-reference/table_functions/mysql/) <!--hide-->

View File

@ -577,6 +577,35 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
</merge_tree>
```
## metric_log {#metric_log}
Эта настройка включена по умолчанию. Если это не так, вы можете включить ее сами.
**Включение**
Чтобы вручную включить сбор истории метрик в таблице [`system.metric_log`](../../operations/system-tables/metric_log.md), создайте `/etc/clickhouse-server/config.d/metric_log.xml` следующего содержания:
``` xml
<yandex>
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
</yandex>
```
**Выключение**
Чтобы отключить настройку `metric_log` , создайте файл `/etc/clickhouse-server/config.d/disable_metric_log.xml` следующего содержания:
``` xml
<yandex>
<metric_log remove="1" />
</yandex>
```
## replicated\_merge\_tree {#server_configuration_parameters-replicated_merge_tree}
Тонкая настройка таблиц в [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/mergetree.md).

View File

@ -2,19 +2,6 @@
Содержит историю значений метрик из таблиц `system.metrics` и `system.events`, периодически сбрасываемую на диск.
Для включения сбора истории метрик в таблице `system.metric_log` создайте `/etc/clickhouse-server/config.d/metric_log.xml` следующего содержания:
``` xml
<yandex>
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
</yandex>
```
Столбцы:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — дата события.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — время события.
@ -55,6 +42,7 @@ CurrentMetric_ReplicatedChecks: 0
**Смотрите также**
- [Настройка metric_log](../../operations/server-configuration-parameters/settings.md#metric_log) — как включить и выключить запись истории.
- [system.asynchronous_metrics](#system_tables-asynchronous_metrics) — таблица с периодически вычисляемыми метриками.
- [system.events](#system_tables-events) — таблица с количеством произошедших событий.
- [system.metrics](#system_tables-metrics) — таблица с мгновенно вычисляемыми метриками.

View File

@ -31,9 +31,11 @@ then
git add *
git add ".nojekyll"
# Push to GitHub rewriting the existing contents.
git commit --quiet -m "Add new release at $(date)"
git push --force origin master
# Push to GitHub rewriting the existing contents.
# Sometimes it does not work with error message "! [remote rejected] master -> master (cannot lock ref 'refs/heads/master': is at 42a0f6b6b6c7be56a469441b4bf29685c1cebac3 but expected 520e9b02c0d4678a2a5f41d2f561e6532fb98cc1)"
for _ in {1..10}; do git push --force origin master && break; sleep 5; done
if [[ ! -z "${CLOUDFLARE_TOKEN}" ]]
then

View File

@ -948,7 +948,7 @@ private:
{ /// disable logs if expects errors
TestHint test_hint(test_mode, all_queries_text);
if (test_hint.clientError() || test_hint.serverError())
processTextAsSingleQuery("SET send_logs_level = 'none'");
processTextAsSingleQuery("SET send_logs_level = 'fatal'");
// Echo all queries if asked; makes for a more readable reference
// file.
@ -1934,7 +1934,12 @@ private:
if (has_vertical_output_suffix)
current_format = "Vertical";
block_out_stream = context.getOutputFormat(current_format, *out_buf, block);
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
if (!need_render_progress)
block_out_stream = context.getOutputStreamParallelIfPossible(current_format, *out_buf, block);
else
block_out_stream = context.getOutputStream(current_format, *out_buf, block);
block_out_stream->writePrefix();
}
}
@ -1991,15 +1996,18 @@ private:
written_first_block = true;
}
bool clear_progess = std_out.offset() > 0;
if (clear_progess)
bool clear_progress = false;
if (need_render_progress)
clear_progress = std_out.offset() > 0;
if (clear_progress)
clearProgress();
/// Received data block is immediately displayed to the user.
block_out_stream->flush();
/// Restore progress bar after data block.
if (clear_progess)
if (clear_progress)
writeProgress();
}

View File

@ -169,11 +169,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
if (offset_in_compressed_file || offset_in_decompressed_block)
{
if (!options.count("input"))
{
throw DB::Exception("--offset-in-compressed-file/--offset-in-decompressed-block requires --input", DB::ErrorCodes::BAD_ARGUMENTS);
}
CompressedReadBufferFromFile compressed_file(options["input"].as<std::string>(), 0, 0, 0);
CompressedReadBufferFromFile compressed_file(std::move(rb));
compressed_file.seek(offset_in_compressed_file, offset_in_decompressed_block);
copyData(compressed_file, *wb);
}

View File

@ -1180,7 +1180,7 @@ try
file_in.seek(0, SEEK_SET);
BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
BlockOutputStreamPtr output = context.getOutputFormat(output_format, file_out, header);
BlockOutputStreamPtr output = context.getOutputStream(output_format, file_out, header);
if (processed_rows + source_rows > limit)
input = std::make_shared<LimitBlockInputStream>(input, limit - processed_rows, 0);

View File

@ -1,24 +1,26 @@
#include "MainHandler.h"
#include "validateODBCConnectionString.h"
#include <memory>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include "ODBCBlockInputStream.h"
#include "ODBCBlockOutputStream.h"
#include "getIdentifierQuote.h"
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromIStream.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <common/logger_useful.h>
#include <mutex>
#include <Poco/ThreadPool.h>
#include <IO/ReadBufferFromIStream.h>
#include <Columns/ColumnsNumber.h>
#include "getIdentifierQuote.h"
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <common/logger_useful.h>
#include <mutex>
#include <memory>
#if USE_ODBC
#include <Poco/Data/ODBC/SessionImpl.h>
@ -162,8 +164,9 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
auto pool = getPool(connection_string);
ReadBufferFromIStream read_buf(request.stream());
BlockInputStreamPtr input_stream = FormatFactory::instance().getInput(format, read_buf, *sample_block,
context, max_block_size);
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block,
context, max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
ODBCBlockOutputStream output_stream(pool->get(), db_name, table_name, *sample_block, quoting_style);
copyData(*input_stream, output_stream);
writeStringBinary("Ok.", out);
@ -173,7 +176,7 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
std::string query = params.get("query");
LOG_TRACE(log, "Query: {}", query);
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, context);
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStream(format, out, *sample_block, context);
auto pool = getPool(connection_string);
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
copyData(inp, *writer);

View File

@ -157,23 +157,7 @@ bool SettingsConstraints::checkImpl(const Settings & current_settings, SettingCh
const String & setting_name = change.name;
if (setting_name == "profile")
{
/// TODO Check profile settings in Context::setProfile(...), not here. It will be backward incompatible.
const String & profile_name = change.value.safeGet<String>();
const auto & profile_settings_changes = manager->getProfileSettings(profile_name);
try
{
/// NOTE We cannot use CLAMP_ON_VIOLATION here, because we cannot modify elements of profile_settings_changes
for (auto change_copy : *profile_settings_changes)
checkImpl(current_settings, change_copy, THROW_ON_VIOLATION);
}
catch (Exception & e)
{
e.addMessage(", while trying to set settings profile {}", profile_name);
throw;
}
return true;
}
bool cannot_cast;
auto cast_value = [&](const Field & x) -> Field

View File

@ -0,0 +1,39 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/HelpersMinMaxAny.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionAny(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionAnyLast(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyLastData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionAnyHeavy(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyHeavyData>(name, argument_types, parameters));
}
}
void registerAggregateFunctionsAny(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true };
factory.registerFunction("any", { createAggregateFunctionAny, properties });
factory.registerFunction("anyLast", { createAggregateFunctionAnyLast, properties });
factory.registerFunction("anyHeavy", { createAggregateFunctionAnyHeavy, properties });
}
}

View File

@ -129,7 +129,7 @@ public:
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -6,6 +6,7 @@
#include <array>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Common/assert_cast.h>
@ -42,6 +43,39 @@ public:
++data(place).count;
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
data(place).count += countBytesInFilter(flags);
}
else
{
data(place).count += batch_size;
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena *,
ssize_t if_argument_pos) const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
data(place).count += countBytesInFilterWithNull(flags, null_map);
}
else
{
data(place).count += batch_size - countBytesInFilter(null_map, batch_size);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
data(place).count += data(rhs).count;

View File

@ -235,6 +235,8 @@ public:
{
return true;
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -252,6 +252,8 @@ public:
{
return nested_func->isState();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};

View File

@ -188,13 +188,13 @@ public:
if (!limit_num_elems)
{
if (rhs_elems.value.size())
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
cur_elems.value.insertByOffsets(rhs_elems.value, 0, rhs_elems.value.size(), arena);
}
else
{
UInt64 elems_to_insert = std::min(static_cast<size_t>(max_elems) - cur_elems.value.size(), rhs_elems.value.size());
if (elems_to_insert)
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.begin() + elems_to_insert, arena);
cur_elems.value.insertByOffsets(rhs_elems.value, 0, elems_to_insert, arena);
}
}

View File

@ -80,6 +80,34 @@ public:
nested_func->add(place, columns, row_num, arena);
}
void addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
ssize_t) const override
{
nested_func->addBatch(batch_size, places, place_offset, columns, arena, num_arguments - 1);
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t) const override
{
nested_func->addBatchSinglePlace(batch_size, place, columns, arena, num_arguments - 1);
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
ssize_t) const override
{
nested_func->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, num_arguments - 1);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
nested_func->merge(place, rhs, arena);
@ -113,6 +141,8 @@ public:
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & nested_function, const DataTypes & arguments,
const Array & params, const AggregateFunctionProperties & properties) const override;
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -0,0 +1,34 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/HelpersMinMaxAny.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionMax(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMaxData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionArgMax(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionArgMinMax<AggregateFunctionMaxData>(name, argument_types, parameters));
}
}
void registerAggregateFunctionsMax(AggregateFunctionFactory & factory)
{
factory.registerFunction("max", createAggregateFunctionMax, AggregateFunctionFactory::CaseInsensitive);
/// The functions below depend on the order of data.
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true };
factory.registerFunction("argMax", { createAggregateFunctionArgMax, properties });
}
}

View File

@ -102,6 +102,8 @@ public:
{
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -0,0 +1,34 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/HelpersMinMaxAny.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionMin(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMinData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionArgMin(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionArgMinMax<AggregateFunctionMinData>(name, argument_types, parameters));
}
}
void registerAggregateFunctionsMin(AggregateFunctionFactory & factory)
{
factory.registerFunction("min", createAggregateFunctionMin, AggregateFunctionFactory::CaseInsensitive);
/// The functions below depend on the order of data.
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true };
factory.registerFunction("argMin", { createAggregateFunctionArgMin, properties });
}
}

View File

@ -1,66 +0,0 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/HelpersMinMaxAny.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionAny(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionAnyLast(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyLastData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionAnyHeavy(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyHeavyData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionMin(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMinData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionMax(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMaxData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionArgMin(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionArgMinMax<AggregateFunctionMinData>(name, argument_types, parameters));
}
AggregateFunctionPtr createAggregateFunctionArgMax(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
return AggregateFunctionPtr(createAggregateFunctionArgMinMax<AggregateFunctionMaxData>(name, argument_types, parameters));
}
}
void registerAggregateFunctionsMinMaxAny(AggregateFunctionFactory & factory)
{
factory.registerFunction("min", createAggregateFunctionMin, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("max", createAggregateFunctionMax, AggregateFunctionFactory::CaseInsensitive);
/// The functions below depend on the order of data.
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true };
factory.registerFunction("any", { createAggregateFunctionAny, properties });
factory.registerFunction("anyLast", { createAggregateFunctionAnyLast, properties });
factory.registerFunction("anyHeavy", { createAggregateFunctionAnyHeavy, properties });
factory.registerFunction("argMin", { createAggregateFunctionArgMin, properties });
factory.registerFunction("argMax", { createAggregateFunctionArgMax, properties });
}
}

View File

@ -180,6 +180,8 @@ public:
{
return nested_function->isState();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
};
@ -209,13 +211,15 @@ public:
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const IColumn * nested_column = &column->getNestedColumn();
const UInt8 * null_map = column->getNullMapData().data();
this->nested_function->addBatchSinglePlaceNotNull(batch_size, this->nestedPlace(place), &nested_column, null_map, arena);
this->nested_function->addBatchSinglePlaceNotNull(
batch_size, this->nestedPlace(place), &nested_column, null_map, arena, if_argument_pos);
if constexpr (result_is_nullable)
if (!memoryIsByte(null_map, batch_size, 1))

View File

@ -2,6 +2,7 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/ReadHelpers.h>
@ -96,37 +97,93 @@ public:
place[size_of_data] = 1;
}
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
void addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
nested_function->addBatch(batch_size, places, place_offset, columns, arena);
for (size_t i = 0; i < batch_size; ++i)
(places[i] + place_offset)[size_of_data] = 1;
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
add(places[i] + place_offset, columns, i, arena);
}
}
else
{
nested_function->addBatch(batch_size, places, place_offset, columns, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
(places[i] + place_offset)[size_of_data] = 1;
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
{
if (batch_size)
if (if_argument_pos >= 0)
{
nested_function->addBatchSinglePlace(batch_size, place, columns, arena);
place[size_of_data] = 1;
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
nested_function->addBatchSinglePlace(batch_size, place, columns, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
{
place[size_of_data] = 1;
break;
}
}
}
else
{
if (batch_size)
{
nested_function->addBatchSinglePlace(batch_size, place, columns, arena, if_argument_pos);
place[size_of_data] = 1;
}
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const override
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
if (batch_size)
if (if_argument_pos >= 0)
{
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena);
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
{
if (!null_map[i])
if (flags[i] && !null_map[i])
{
place[size_of_data] = 1;
break;
}
}
}
else
{
if (batch_size)
{
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
{
if (!null_map[i])
{
place[size_of_data] = 1;
break;
}
}
}
}
}
void merge(
@ -207,6 +264,8 @@ public:
else
to.insertDefault();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
};
}

View File

@ -198,6 +198,8 @@ public:
col_offsets.getData().push_back(col.getData().size());
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
};
}

View File

@ -79,7 +79,7 @@ public:
bool allocatesMemoryInArena() const override { return nested_func->allocatesMemoryInArena(); }
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -92,7 +92,7 @@ public:
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -282,17 +282,41 @@ public:
}
/// Vectorized version when there is no GROUP BY keys.
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *) const override
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addMany(column.getData().data(), batch_size);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
add(place, columns, i, arena);
}
}
else
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addMany(column.getData().data(), batch_size);
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena *) const override
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena, ssize_t if_argument_pos)
const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i] && flags[i])
add(place, columns, i, arena);
}
else
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override

View File

@ -10,6 +10,7 @@
#include <Core/Block.h>
#include <Common/Exception.h>
#include <Core/Field.h>
#include <Columns/ColumnsNumber.h>
namespace DB
@ -143,19 +144,32 @@ public:
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
virtual void addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
virtual void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const = 0;
/** The same for single place when need to aggregate only filtered data.
*/
virtual void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const = 0;
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
ssize_t if_argument_pos = -1) const = 0;
virtual void addBatchSinglePlaceFromInterval(
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1)
const = 0;
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
* as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and
@ -195,6 +209,11 @@ public:
return nullptr;
}
/** Return the nested function if this is an Aggregate Function Combinator.
* Otherwise return nullptr.
*/
virtual AggregateFunctionPtr getNestedFunction() const { return {}; }
const DataTypes & getArgumentTypes() const { return argument_types; }
const Array & getParameters() const { return parameters; }
@ -220,31 +239,90 @@ public:
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
void addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
}
else
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
else
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const override
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i] && flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
else
{
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSinglePlaceFromInterval(
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1)
const override
{
for (size_t i = batch_begin; i < batch_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = batch_begin; i < batch_end; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
else
{
for (size_t i = batch_begin; i < batch_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchArray(

View File

@ -18,7 +18,9 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory &);
void registerAggregateFunctionsSequenceMatch(AggregateFunctionFactory &);
void registerAggregateFunctionWindowFunnel(AggregateFunctionFactory &);
void registerAggregateFunctionRate(AggregateFunctionFactory &);
void registerAggregateFunctionsMinMaxAny(AggregateFunctionFactory &);
void registerAggregateFunctionsMin(AggregateFunctionFactory &);
void registerAggregateFunctionsMax(AggregateFunctionFactory &);
void registerAggregateFunctionsAny(AggregateFunctionFactory &);
void registerAggregateFunctionsStatisticsStable(AggregateFunctionFactory &);
void registerAggregateFunctionsStatisticsSimple(AggregateFunctionFactory &);
void registerAggregateFunctionSum(AggregateFunctionFactory &);
@ -71,7 +73,9 @@ void registerAggregateFunctions()
registerAggregateFunctionsSequenceMatch(factory);
registerAggregateFunctionWindowFunnel(factory);
registerAggregateFunctionRate(factory);
registerAggregateFunctionsMinMaxAny(factory);
registerAggregateFunctionsMin(factory);
registerAggregateFunctionsMax(factory);
registerAggregateFunctionsAny(factory);
registerAggregateFunctionsStatisticsStable(factory);
registerAggregateFunctionsStatisticsSimple(factory);
registerAggregateFunctionSum(factory);

View File

@ -10,6 +10,7 @@ PEERDIR(
SRCS(
AggregateFunctionAggThrow.cpp
AggregateFunctionAny.cpp
AggregateFunctionArray.cpp
AggregateFunctionAvg.cpp
AggregateFunctionAvgWeighted.cpp
@ -30,9 +31,10 @@ SRCS(
AggregateFunctionIf.cpp
AggregateFunctionMLMethod.cpp
AggregateFunctionMannWhitney.cpp
AggregateFunctionMax.cpp
AggregateFunctionMaxIntersections.cpp
AggregateFunctionMerge.cpp
AggregateFunctionMinMaxAny.cpp
AggregateFunctionMin.cpp
AggregateFunctionNull.cpp
AggregateFunctionOrFill.cpp
AggregateFunctionQuantile.cpp

View File

@ -393,6 +393,12 @@ size_t ColumnAggregateFunction::byteSize() const
+ (my_arena ? my_arena->size() : 0);
}
size_t ColumnAggregateFunction::byteSizeAt(size_t) const
{
/// Lower estimate as aggregate function can allocate more data in Arena.
return sizeof(data[0]) + func->sizeOfData();
}
/// Like in byteSize(), the size is underestimated.
size_t ColumnAggregateFunction::allocatedBytes() const
{

View File

@ -163,6 +163,8 @@ public:
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;

View File

@ -403,6 +403,21 @@ size_t ColumnArray::byteSize() const
}
size_t ColumnArray::byteSizeAt(size_t n) const
{
const auto & offsets_data = getOffsets();
size_t pos = offsets_data[n - 1];
size_t end = offsets_data[n];
size_t res = sizeof(offsets_data[0]);
for (; pos < end; ++pos)
res += getData().byteSizeAt(pos);
return res;
}
size_t ColumnArray::allocatedBytes() const
{
return getData().allocatedBytes() + getOffsets().allocated_bytes();

View File

@ -84,6 +84,7 @@ public:
void updatePermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int nan_direction_hint, Permutation & res, EqualRanges& equal_range) const override;
void reserve(size_t n) override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override;

View File

@ -187,6 +187,11 @@ public:
return data->byteSize() + sizeof(s);
}
size_t byteSizeAt(size_t) const override
{
return data->byteSizeAt(0);
}
size_t allocatedBytes() const override
{
return data->allocatedBytes() + sizeof(s);

View File

@ -87,6 +87,7 @@ public:
size_t size() const override { return data.size(); }
size_t byteSize() const override { return data.size() * sizeof(data[0]); }
size_t byteSizeAt(size_t) const override { return sizeof(data[0]); }
size_t allocatedBytes() const override { return data.allocated_bytes(); }
void protect() override { data.protect(); }
void reserve(size_t n) override { data.reserve(n); }

View File

@ -57,6 +57,11 @@ public:
return chars.size() + sizeof(n);
}
size_t byteSizeAt(size_t) const override
{
return n;
}
size_t allocatedBytes() const override
{
return chars.allocated_bytes() + sizeof(n);

View File

@ -140,6 +140,15 @@ size_t ColumnFunction::byteSize() const
return total_size;
}
size_t ColumnFunction::byteSizeAt(size_t n) const
{
size_t total_size = 0;
for (const auto & column : captured_columns)
total_size += column.column->byteSizeAt(n);
return total_size;
}
size_t ColumnFunction::allocatedBytes() const
{
size_t total_size = 0;

View File

@ -47,6 +47,7 @@ public:
void getExtremes(Field &, Field &) const override {}
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void appendArguments(const ColumnsWithTypeAndName & columns);

View File

@ -151,6 +151,7 @@ public:
void reserve(size_t n) override { idx.reserve(n); }
size_t byteSize() const override { return idx.getPositions()->byteSize() + getDictionary().byteSize(); }
size_t byteSizeAt(size_t n) const override { return getDictionary().byteSizeAt(getIndexes().getUInt(n)); }
size_t allocatedBytes() const override { return idx.getPositions()->allocatedBytes() + getDictionary().allocatedBytes(); }
void forEachSubcolumn(ColumnCallback callback) override

View File

@ -211,6 +211,11 @@ size_t ColumnMap::byteSize() const
return nested->byteSize();
}
size_t ColumnMap::byteSizeAt(size_t n) const
{
return nested->byteSizeAt(n);
}
size_t ColumnMap::allocatedBytes() const
{
return nested->allocatedBytes();

View File

@ -77,6 +77,7 @@ public:
void updatePermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_range) const override;
void reserve(size_t n) override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
void forEachSubcolumn(ColumnCallback callback) override;

View File

@ -495,6 +495,11 @@ size_t ColumnNullable::byteSize() const
return getNestedColumn().byteSize() + getNullMapColumn().byteSize();
}
size_t ColumnNullable::byteSizeAt(size_t n) const
{
return sizeof(getNullMapData()[0]) + getNestedColumn().byteSizeAt(n);
}
size_t ColumnNullable::allocatedBytes() const
{
return getNestedColumn().allocatedBytes() + getNullMapColumn().allocatedBytes();

View File

@ -101,6 +101,7 @@ public:
const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_range) const override;
void reserve(size_t n) override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override;

View File

@ -71,6 +71,12 @@ public:
return chars.size() + offsets.size() * sizeof(offsets[0]);
}
size_t byteSizeAt(size_t n) const override
{
assert(n < size());
return sizeAt(n) + sizeof(offsets[0]);
}
size_t allocatedBytes() const override
{
return chars.allocated_bytes() + offsets.allocated_bytes();

View File

@ -424,6 +424,14 @@ size_t ColumnTuple::byteSize() const
return res;
}
size_t ColumnTuple::byteSizeAt(size_t n) const
{
size_t res = 0;
for (const auto & column : columns)
res += column->byteSizeAt(n);
return res;
}
size_t ColumnTuple::allocatedBytes() const
{
size_t res = 0;

View File

@ -83,6 +83,7 @@ public:
void updatePermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int nan_direction_hint, Permutation & res, EqualRanges& equal_ranges) const override;
void reserve(size_t n) override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
void forEachSubcolumn(ColumnCallback callback) override;

View File

@ -88,6 +88,10 @@ public:
bool isNumeric() const override { return column_holder->isNumeric(); }
size_t byteSize() const override { return column_holder->byteSize(); }
size_t byteSizeAt(size_t n) const override
{
return getNestedColumn()->byteSizeAt(n);
}
void protect() override { column_holder->protect(); }
size_t allocatedBytes() const override
{

View File

@ -178,6 +178,11 @@ public:
return data.size() * sizeof(data[0]);
}
size_t byteSizeAt(size_t) const override
{
return sizeof(data[0]);
}
size_t allocatedBytes() const override
{
if constexpr (is_POD)

View File

@ -12,7 +12,54 @@
namespace DB
{
#if defined(__SSE2__) && defined(__POPCNT__)
/// Transform 64-byte mask to 64-bit mask.
static UInt64 toBits64(const Int8 * bytes64)
{
static const __m128i zero16 = _mm_setzero_si128();
return static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64)), zero16)))
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 16)), zero16)))
<< 16)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 32)), zero16)))
<< 32)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 48)), zero16)))
<< 48);
}
#endif
size_t countBytesInFilter(const UInt8 * filt, size_t sz)
{
size_t count = 0;
/** NOTE: In theory, `filt` should only contain zeros and ones.
* But, just in case, here the condition > 0 (to signed bytes) is used.
* It would be better to use != 0, then this does not allow SSE2.
*/
const Int8 * pos = reinterpret_cast<const Int8 *>(filt);
const Int8 * end = pos + sz;
#if defined(__SSE2__) && defined(__POPCNT__)
const Int8 * end64 = pos + sz / 64 * 64;
for (; pos < end64; pos += 64)
count += __builtin_popcountll(toBits64(pos));
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos)
count += *pos > 0;
return count;
}
size_t countBytesInFilter(const IColumn::Filter & filt)
{
return countBytesInFilter(filt.data(), filt.size());
}
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map)
{
size_t count = 0;
@ -22,32 +69,20 @@ size_t countBytesInFilter(const IColumn::Filter & filt)
*/
const Int8 * pos = reinterpret_cast<const Int8 *>(filt.data());
const Int8 * pos2 = reinterpret_cast<const Int8 *>(null_map);
const Int8 * end = pos + filt.size();
#if defined(__SSE2__) && defined(__POPCNT__)
const __m128i zero16 = _mm_setzero_si128();
const Int8 * end64 = pos + filt.size() / 64 * 64;
for (; pos < end64; pos += 64)
count += __builtin_popcountll(
static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos)),
zero16)))
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 16)),
zero16))) << 16)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 32)),
zero16))) << 32)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 48)),
zero16))) << 48));
for (; pos < end64; pos += 64, pos2 += 64)
count += __builtin_popcountll(toBits64(pos) & ~toBits64(pos2));
/// TODO Add duff device for tail?
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos)
count += *pos > 0;
count += (*pos & ~*pos2) > 0;
return count;
}

View File

@ -15,7 +15,9 @@ namespace ErrorCodes
}
/// Counts how many bytes of `filt` are greater than zero.
size_t countBytesInFilter(const UInt8 * filt, size_t sz);
size_t countBytesInFilter(const IColumn::Filter & filt);
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map);
/// Returns vector with num_columns elements. vector[i] is the count of i values in selector.
/// Selector must contain values from 0 to num_columns - 1. NOTE: this is not checked.

View File

@ -333,6 +333,9 @@ public:
/// Size of column data in memory (may be approximate) - for profiling. Zero, if could not be determined.
virtual size_t byteSize() const = 0;
/// Size of single value in memory (for accounting purposes)
virtual size_t byteSizeAt(size_t /*n*/) const = 0;
/// Size of memory, allocated for column.
/// This is greater or equals to byteSize due to memory reservation in containers.
/// Zero, if could not be determined.

View File

@ -33,6 +33,7 @@ public:
void insertDefault() override { ++s; }
void popBack(size_t n) override { s -= n; }
size_t byteSize() const override { return 0; }
size_t byteSizeAt(size_t) const override { return 0; }
size_t allocatedBytes() const override { return 0; }
int compareAt(size_t, size_t, const IColumn &, int) const override { return 0; }
void compareColumn(const IColumn &, size_t, PaddedPODArray<UInt64> *, PaddedPODArray<Int8> &, int, int) const override

View File

@ -28,7 +28,7 @@
#include <common/mremap.h>
#include <common/getPageSize.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentMemoryTracker.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>

View File

@ -28,7 +28,7 @@ namespace DB
* - put lot of strings inside pool, keep their addresses;
* - addresses remain valid during lifetime of pool;
* - at destruction of pool, all memory is freed;
* - memory is allocated and freed by large chunks;
* - memory is allocated and freed by large MemoryChunks;
* - freeing parts of data is not possible (but look at ArenaWithFreeLists if you need);
*/
class Arena : private boost::noncopyable
@ -37,16 +37,16 @@ private:
/// Padding allows to use 'memcpySmallAllowReadWriteOverflow15' instead of 'memcpy'.
static constexpr size_t pad_right = 15;
/// Contiguous chunk of memory and pointer to free space inside it. Member of single-linked list.
struct alignas(16) Chunk : private Allocator<false> /// empty base optimization
/// Contiguous MemoryChunk of memory and pointer to free space inside it. Member of single-linked list.
struct alignas(16) MemoryChunk : private Allocator<false> /// empty base optimization
{
char * begin;
char * pos;
char * end; /// does not include padding.
Chunk * prev;
MemoryChunk * prev;
Chunk(size_t size_, Chunk * prev_)
MemoryChunk(size_t size_, MemoryChunk * prev_)
{
ProfileEvents::increment(ProfileEvents::ArenaAllocChunks);
ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_);
@ -59,7 +59,7 @@ private:
ASAN_POISON_MEMORY_REGION(begin, size_);
}
~Chunk()
~MemoryChunk()
{
/// We must unpoison the memory before returning to the allocator,
/// because the allocator might not have asan integration, and the
@ -80,8 +80,8 @@ private:
size_t growth_factor;
size_t linear_growth_threshold;
/// Last contiguous chunk of memory.
Chunk * head;
/// Last contiguous MemoryChunk of memory.
MemoryChunk * head;
size_t size_in_bytes;
size_t page_size;
@ -90,7 +90,7 @@ private:
return (s + page_size - 1) / page_size * page_size;
}
/// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth
/// If MemoryChunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth
/// (to not allocate too much excessive memory).
size_t nextSize(size_t min_next_size) const
{
@ -104,7 +104,7 @@ private:
{
// allocContinue() combined with linear growth results in quadratic
// behavior: we append the data by small amounts, and when it
// doesn't fit, we create a new chunk and copy all the previous data
// doesn't fit, we create a new MemoryChunk and copy all the previous data
// into it. The number of times we do this is directly proportional
// to the total size of data that is going to be serialized. To make
// the copying happen less often, round the next size up to the
@ -117,10 +117,10 @@ private:
return roundUpToPageSize(size_after_grow, page_size);
}
/// Add next contiguous chunk of memory with size not less than specified.
void NO_INLINE addChunk(size_t min_size)
/// Add next contiguous MemoryChunk of memory with size not less than specified.
void NO_INLINE addMemoryChunk(size_t min_size)
{
head = new Chunk(nextSize(min_size + pad_right), head);
head = new MemoryChunk(nextSize(min_size + pad_right), head);
size_in_bytes += head->size();
}
@ -130,7 +130,7 @@ private:
public:
Arena(size_t initial_size_ = 4096, size_t growth_factor_ = 2, size_t linear_growth_threshold_ = 128 * 1024 * 1024)
: growth_factor(growth_factor_), linear_growth_threshold(linear_growth_threshold_),
head(new Chunk(initial_size_, nullptr)), size_in_bytes(head->size()),
head(new MemoryChunk(initial_size_, nullptr)), size_in_bytes(head->size()),
page_size(static_cast<size_t>(::getPageSize()))
{
}
@ -144,7 +144,7 @@ public:
char * alloc(size_t size)
{
if (unlikely(head->pos + size > head->end))
addChunk(size);
addMemoryChunk(size);
char * res = head->pos;
head->pos += size;
@ -169,7 +169,7 @@ public:
return res;
}
addChunk(size + alignment);
addMemoryChunk(size + alignment);
} while (true);
}
@ -194,8 +194,8 @@ public:
/** Begin or expand a contiguous range of memory.
* 'range_start' is the start of range. If nullptr, a new range is
* allocated.
* If there is no space in the current chunk to expand the range,
* the entire range is copied to a new, bigger memory chunk, and the value
* If there is no space in the current MemoryChunk to expand the range,
* the entire range is copied to a new, bigger memory MemoryChunk, and the value
* of 'range_start' is updated.
* If the optional 'start_alignment' is specified, the start of range is
* kept aligned to this value.
@ -209,7 +209,7 @@ public:
/*
* Allocating zero bytes doesn't make much sense. Also, a zero-sized
* range might break the invariant that the range begins at least before
* the current chunk end.
* the current MemoryChunk end.
*/
assert(additional_bytes > 0);
@ -228,19 +228,19 @@ public:
// This method only works for extending the last allocation. For lack of
// original size, check a weaker condition: that 'begin' is at least in
// the current Chunk.
// the current MemoryChunk.
assert(range_start >= head->begin);
assert(range_start < head->end);
if (head->pos + additional_bytes <= head->end)
{
// The new size fits into the last chunk, so just alloc the
// The new size fits into the last MemoryChunk, so just alloc the
// additional size. We can alloc without alignment here, because it
// only applies to the start of the range, and we don't change it.
return alloc(additional_bytes);
}
// New range doesn't fit into this chunk, will copy to a new one.
// New range doesn't fit into this MemoryChunk, will copy to a new one.
//
// Note: among other things, this method is used to provide a hack-ish
// implementation of realloc over Arenas in ArenaAllocators. It wastes a
@ -301,16 +301,16 @@ public:
return res;
}
/// Size of chunks in bytes.
/// Size of MemoryChunks in bytes.
size_t size() const
{
return size_in_bytes;
}
/// Bad method, don't use it -- the chunks are not your business, the entire
/// Bad method, don't use it -- the MemoryChunks are not your business, the entire
/// purpose of the arena code is to manage them for you, so if you find
/// yourself having to use this method, probably you're doing something wrong.
size_t remainingSpaceInCurrentChunk() const
size_t remainingSpaceInCurrentMemoryChunk() const
{
return head->remaining();
}

View File

@ -0,0 +1,81 @@
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMemoryTracker.h>
namespace
{
MemoryTracker * getMemoryTracker()
{
if (auto * thread_memory_tracker = DB::CurrentThread::getMemoryTracker())
return thread_memory_tracker;
/// Once the main thread is initialized,
/// total_memory_tracker is initialized too.
/// And can be used, since MainThreadStatus is required for profiling.
if (DB::MainThreadStatus::get())
return &total_memory_tracker;
return nullptr;
}
}
namespace CurrentMemoryTracker
{
using DB::current_thread;
void alloc(Int64 size)
{
if (auto * memory_tracker = getMemoryTracker())
{
if (current_thread)
{
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->alloc(size);
}
}
}
void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
addition > 0 ? alloc(addition) : free(-addition);
}
void free(Int64 size)
{
if (auto * memory_tracker = getMemoryTracker())
{
if (current_thread)
{
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
{
memory_tracker->free(-current_thread->untracked_memory);
current_thread->untracked_memory = 0;
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->free(size);
}
}
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <common/types.h>
/// Convenience methods, that use current thread's memory_tracker if it is available.
namespace CurrentMemoryTracker
{
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
}

View File

@ -2,7 +2,7 @@
#include <common/defines.h>
#include <boost/context/stack_context.hpp>
#include <Common/formatReadable.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentMemoryTracker.h>
#include <sys/time.h>
#include <sys/resource.h>

View File

@ -2,12 +2,20 @@
#include <Poco/Net/IPAddress.h>
#include <Poco/ByteOrder.h>
#include <Common/formatIPv6.h>
#include <cstring>
namespace DB
{
/// Result array could be indexed with all possible uint8 values without extra check.
/// For values greater than 128 we will store same value as for 128 (all bits set).
constexpr size_t IPV6_MASKS_COUNT = 256;
using RawMaskArray = std::array<uint8_t, IPV6_BINARY_LENGTH>;
void IPv6ToRawBinary(const Poco::Net::IPAddress & address, char * res)
{
if (Poco::Net::IPAddress::IPv6 == address.family())
@ -33,4 +41,33 @@ std::array<char, 16> IPv6ToBinary(const Poco::Net::IPAddress & address)
return res;
}
static constexpr RawMaskArray generateBitMask(size_t prefix)
{
if (prefix >= 128)
prefix = 128;
RawMaskArray arr{0};
size_t i = 0;
for (; prefix >= 8; ++i, prefix -= 8)
arr[i] = 0xff;
if (prefix > 0)
arr[i++] = ~(0xff >> prefix);
while (i < 16)
arr[i++] = 0x00;
return arr;
}
static constexpr std::array<RawMaskArray, IPV6_MASKS_COUNT> generateBitMasks()
{
std::array<RawMaskArray, IPV6_MASKS_COUNT> arr{};
for (size_t i = 0; i < IPV6_MASKS_COUNT; ++i)
arr[i] = generateBitMask(i);
return arr;
}
const uint8_t * getCIDRMaskIPv6(UInt8 prefix_len)
{
static constexpr std::array<RawMaskArray, IPV6_MASKS_COUNT> IPV6_RAW_MASK_ARRAY = generateBitMasks();
return IPV6_RAW_MASK_ARRAY[prefix_len].data();
}
}

View File

@ -14,4 +14,9 @@ void IPv6ToRawBinary(const Poco::Net::IPAddress & address, char * res);
/// Convert IP address to 16-byte array with IPv6 data (big endian). If it's an IPv4, map it to IPv6.
std::array<char, 16> IPv6ToBinary(const Poco::Net::IPAddress & address);
/// Returns pointer to 16-byte array containing mask with first `prefix_len` bits set to `1` and `128 - prefix_len` to `0`.
/// Pointer is valid during all program execution time and doesn't require freeing.
/// Values of prefix_len greater than 128 interpreted as 128 exactly.
const uint8_t * getCIDRMaskIPv6(UInt8 prefix_len);
}

View File

@ -2,7 +2,6 @@
#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <common/logger_useful.h>
@ -16,20 +15,6 @@
namespace
{
MemoryTracker * getMemoryTracker()
{
if (auto * thread_memory_tracker = DB::CurrentThread::getMemoryTracker())
return thread_memory_tracker;
/// Once the main thread is initialized,
/// total_memory_tracker is initialized too.
/// And can be used, since MainThreadStatus is required for profiling.
if (DB::MainThreadStatus::get())
return &total_memory_tracker;
return nullptr;
}
/// MemoryTracker cannot throw MEMORY_LIMIT_EXCEEDED (either configured memory
/// limit reached or fault injected), in the following cases:
///
@ -41,9 +26,9 @@ MemoryTracker * getMemoryTracker()
/// NOTE: that since C++11 destructor marked with noexcept by default, and
/// this means that any throw from destructor (that is not marked with
/// noexcept(false)) will cause std::terminate()
bool inline memoryTrackerCanThrow()
bool inline memoryTrackerCanThrow(VariableContext level, bool fault_injection)
{
return !MemoryTracker::LockExceptionInThread::isBlocked() && !std::uncaught_exceptions();
return !MemoryTracker::LockExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions();
}
}
@ -64,8 +49,40 @@ namespace ProfileEvents
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
// BlockerInThread
thread_local uint64_t MemoryTracker::BlockerInThread::counter = 0;
thread_local VariableContext MemoryTracker::BlockerInThread::level = VariableContext::Global;
MemoryTracker::BlockerInThread::BlockerInThread(VariableContext level_)
: previous_level(level)
{
++counter;
level = level_;
}
MemoryTracker::BlockerInThread::~BlockerInThread()
{
--counter;
level = previous_level;
}
/// LockExceptionInThread
thread_local uint64_t MemoryTracker::LockExceptionInThread::counter = 0;
thread_local VariableContext MemoryTracker::LockExceptionInThread::level = VariableContext::Global;
thread_local bool MemoryTracker::LockExceptionInThread::block_fault_injections = false;
MemoryTracker::LockExceptionInThread::LockExceptionInThread(VariableContext level_, bool block_fault_injections_)
: previous_level(level)
, previous_block_fault_injections(block_fault_injections)
{
++counter;
level = level_;
block_fault_injections = block_fault_injections_;
}
MemoryTracker::LockExceptionInThread::~LockExceptionInThread()
{
--counter;
level = previous_level;
block_fault_injections = previous_block_fault_injections;
}
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
@ -110,8 +127,13 @@ void MemoryTracker::alloc(Int64 size)
if (size < 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
if (BlockerInThread::isBlocked())
if (BlockerInThread::isBlocked(level))
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
return;
}
/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
@ -144,7 +166,7 @@ void MemoryTracker::alloc(Int64 size)
}
std::bernoulli_distribution fault(fault_probability);
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow())
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true))
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock;
@ -173,7 +195,7 @@ void MemoryTracker::alloc(Int64 size)
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
}
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow())
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false))
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock;
@ -211,7 +233,7 @@ void MemoryTracker::updatePeak(Int64 will_be)
void MemoryTracker::free(Int64 size)
{
if (BlockerInThread::isBlocked())
if (BlockerInThread::isBlocked(level))
return;
std::bernoulli_distribution sample(sample_probability);
@ -292,60 +314,3 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
namespace CurrentMemoryTracker
{
using DB::current_thread;
void alloc(Int64 size)
{
if (auto * memory_tracker = getMemoryTracker())
{
if (current_thread)
{
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->alloc(size);
}
}
}
void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
addition > 0 ? alloc(addition) : free(-addition);
}
void free(Int64 size)
{
if (auto * memory_tracker = getMemoryTracker())
{
if (current_thread)
{
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
{
memory_tracker->free(-current_thread->untracked_memory);
current_thread->untracked_memory = 0;
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->free(size);
}
}
}
}

View File

@ -136,11 +136,20 @@ public:
private:
BlockerInThread(const BlockerInThread &) = delete;
BlockerInThread & operator=(const BlockerInThread &) = delete;
static thread_local uint64_t counter;
static thread_local VariableContext level;
VariableContext previous_level;
public:
BlockerInThread() { ++counter; }
~BlockerInThread() { --counter; }
static bool isBlocked() { return counter > 0; }
/// level_ - block in level and above
BlockerInThread(VariableContext level_ = VariableContext::Global);
~BlockerInThread();
static bool isBlocked(VariableContext current_level)
{
return counter > 0 && current_level >= level;
}
};
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:
@ -160,21 +169,24 @@ public:
private:
LockExceptionInThread(const LockExceptionInThread &) = delete;
LockExceptionInThread & operator=(const LockExceptionInThread &) = delete;
static thread_local uint64_t counter;
static thread_local VariableContext level;
static thread_local bool block_fault_injections;
VariableContext previous_level;
bool previous_block_fault_injections;
public:
LockExceptionInThread() { ++counter; }
~LockExceptionInThread() { --counter; }
static bool isBlocked() { return counter > 0; }
/// level_ - block in level and above
/// block_fault_injections_ - block in fault injection too
LockExceptionInThread(VariableContext level_ = VariableContext::Global, bool block_fault_injections_ = true);
~LockExceptionInThread();
static bool isBlocked(VariableContext current_level, bool fault_injection)
{
return counter > 0 && current_level >= level && (!fault_injection || (fault_injection && block_fault_injections));
}
};
};
extern MemoryTracker total_memory_tracker;
/// Convenience methods, that use current thread's memory_tracker if it is available.
namespace CurrentMemoryTracker
{
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
}

View File

@ -89,8 +89,8 @@ protected:
static constexpr size_t pad_right = integerRoundUp(pad_right_, ELEMENT_SIZE);
/// pad_left is also rounded up to 16 bytes to maintain alignment of allocated memory.
static constexpr size_t pad_left = integerRoundUp(integerRoundUp(pad_left_, ELEMENT_SIZE), 16);
/// Empty array will point to this static memory as padding.
static constexpr char * null = pad_left ? const_cast<char *>(empty_pod_array) + empty_pod_array_size : nullptr;
/// Empty array will point to this static memory as padding and begin/end.
static constexpr char * null = const_cast<char *>(empty_pod_array) + pad_left;
static_assert(pad_left <= empty_pod_array_size && "Left Padding exceeds empty_pod_array_size. Is the element size too large?");
@ -268,8 +268,11 @@ public:
reserve(required_capacity, std::forward<TAllocatorParams>(allocator_params)...);
size_t items_byte_size = byte_size(number_of_items);
memcpy(c_end, ptr, items_byte_size);
c_end += items_byte_size;
if (items_byte_size)
{
memcpy(c_end, ptr, items_byte_size);
c_end += items_byte_size;
}
}
void protect()
@ -289,6 +292,18 @@ public:
#endif
}
template <typename It1, typename It2>
inline void assertNotIntersects(It1 from_begin [[maybe_unused]], It2 from_end [[maybe_unused]])
{
#if !defined(NDEBUG)
const char * ptr_begin = reinterpret_cast<const char *>(&*from_begin);
const char * ptr_end = reinterpret_cast<const char *>(&*from_end);
/// Also it's safe if the range is empty.
assert(!((ptr_begin >= c_start && ptr_begin < c_end) || (ptr_end > c_start && ptr_end <= c_end)) || (ptr_begin == ptr_end));
#endif
}
~PODArrayBase()
{
dealloc();
@ -444,6 +459,7 @@ public:
template <typename It1, typename It2, typename ... TAllocatorParams>
void insertPrepare(It1 from_begin, It2 from_end, TAllocatorParams &&... allocator_params)
{
this->assertNotIntersects(from_begin, from_end);
size_t required_capacity = this->size() + (from_end - from_begin);
if (required_capacity > this->capacity())
this->reserve(roundUpToPowerOfTwoOrZero(required_capacity), std::forward<TAllocatorParams>(allocator_params)...);
@ -457,6 +473,28 @@ public:
insert_assume_reserved(from_begin, from_end);
}
/// In contrast to 'insert' this method is Ok even for inserting from itself.
/// Because we obtain iterators after reserving memory.
template <typename Container, typename ... TAllocatorParams>
void insertByOffsets(Container && rhs, size_t from_begin, size_t from_end, TAllocatorParams &&... allocator_params)
{
static_assert(memcpy_can_be_used_for_assignment<std::decay_t<T>, std::decay_t<decltype(rhs.front())>>);
assert(from_end >= from_begin);
assert(from_end <= rhs.size());
size_t required_capacity = this->size() + (from_end - from_begin);
if (required_capacity > this->capacity())
this->reserve(roundUpToPowerOfTwoOrZero(required_capacity), std::forward<TAllocatorParams>(allocator_params)...);
size_t bytes_to_copy = this->byte_size(from_end - from_begin);
if (bytes_to_copy)
{
memcpy(this->c_end, reinterpret_cast<const void *>(rhs.begin() + from_begin), bytes_to_copy);
this->c_end += bytes_to_copy;
}
}
/// Works under assumption, that it's possible to read up to 15 excessive bytes after `from_end` and this PODArray is padded.
template <typename It1, typename It2, typename ... TAllocatorParams>
void insertSmallAllowReadWriteOverflow15(It1 from_begin, It2 from_end, TAllocatorParams &&... allocator_params)
@ -476,6 +514,9 @@ public:
static_assert(memcpy_can_be_used_for_assignment<std::decay_t<T>, std::decay_t<decltype(*from_begin)>>);
size_t bytes_to_copy = this->byte_size(from_end - from_begin);
if (!bytes_to_copy)
return;
size_t bytes_to_move = this->byte_size(end() - it);
insertPrepare(from_begin, from_end);
@ -492,10 +533,14 @@ public:
void insert_assume_reserved(It1 from_begin, It2 from_end)
{
static_assert(memcpy_can_be_used_for_assignment<std::decay_t<T>, std::decay_t<decltype(*from_begin)>>);
this->assertNotIntersects(from_begin, from_end);
size_t bytes_to_copy = this->byte_size(from_end - from_begin);
memcpy(this->c_end, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
this->c_end += bytes_to_copy;
if (bytes_to_copy)
{
memcpy(this->c_end, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
this->c_end += bytes_to_copy;
}
}
template <typename... TAllocatorParams>
@ -626,15 +671,18 @@ public:
void assign(It1 from_begin, It2 from_end, TAllocatorParams &&... allocator_params)
{
static_assert(memcpy_can_be_used_for_assignment<std::decay_t<T>, std::decay_t<decltype(*from_begin)>>);
this->assertNotIntersects(from_begin, from_end);
size_t required_capacity = from_end - from_begin;
if (required_capacity > this->capacity())
this->reserve_exact(required_capacity, std::forward<TAllocatorParams>(allocator_params)...);
size_t bytes_to_copy = this->byte_size(required_capacity);
memcpy(this->c_start, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
this->c_end = this->c_start + bytes_to_copy;
if (bytes_to_copy)
{
memcpy(this->c_start, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
this->c_end = this->c_start + bytes_to_copy;
}
}
// ISO C++ has strict ambiguity rules, thus we cannot apply TAllocatorParams here.

View File

@ -13,7 +13,6 @@
#include <Common/ThreadStatus.h>
#include <ext/scope_guard.h>
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
@ -188,7 +187,7 @@ public:
ThreadFromGlobalPool & operator=(ThreadFromGlobalPool && rhs)
{
if (joinable())
std::terminate();
abort();
state = std::move(rhs.state);
return *this;
}
@ -196,13 +195,13 @@ public:
~ThreadFromGlobalPool()
{
if (joinable())
std::terminate();
abort();
}
void join()
{
if (!joinable())
std::terminate();
abort();
state->wait();
state.reset();
@ -211,7 +210,7 @@ public:
void detach()
{
if (!joinable())
std::terminate();
abort();
state.reset();
}

View File

@ -5,33 +5,25 @@
#include <common/arithmeticOverflow.h>
#include <Common/Exception.h>
#include <Common/UnicodeBar.h>
#include <Common/NaNUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int PARAMETER_OUT_OF_BOUND;
}
}
#include <iostream>
namespace UnicodeBar
{
double getWidth(Int64 x, Int64 min, Int64 max, double max_width)
double getWidth(double x, double min, double max, double max_width)
{
if (isNaN(x))
return 0;
if (x <= min)
return 0;
if (x >= max)
return max_width;
/// The case when max - min overflows
Int64 max_difference;
if (common::subOverflow(max, min, max_difference))
throw DB::Exception(DB::ErrorCodes::PARAMETER_OUT_OF_BOUND, "The arguments to render unicode bar will lead to arithmetic overflow");
return (x - min) * max_width / max_difference;
return (x - min) / (max - min) * max_width;
}
size_t getWidthInBytes(double width)

View File

@ -10,7 +10,7 @@
*/
namespace UnicodeBar
{
double getWidth(Int64 x, Int64 min, Int64 max, double max_width);
double getWidth(double x, double min, double max, double max_width);
size_t getWidthInBytes(double width);
/// In `dst` there must be a space for barWidthInBytes(width) characters and a trailing zero.

View File

@ -1,5 +1,5 @@
#include <common/memory.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentMemoryTracker.h>
#include <iostream>
#include <new>

View File

@ -33,6 +33,7 @@ SRCS(
Config/ConfigProcessor.cpp
Config/ConfigReloader.cpp
Config/configReadClient.cpp
CurrentMemoryTracker.cpp
CurrentMetrics.cpp
CurrentThread.cpp
DNSResolver.cpp

View File

@ -121,6 +121,7 @@ class IColumn;
\
M(Bool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
M(UInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
M(Bool, output_format_parallel_formatting, true, "Enable parallel formatting for some data formats.", 0) \
\
M(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \

View File

@ -1,313 +0,0 @@
#include <DataStreams/ParallelParsingBlockInputStream.h>
#include <IO/ReadBuffer.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <ext/scope_guard.h>
namespace DB
{
ParallelParsingBlockInputStream::ParallelParsingBlockInputStream(const Params & params)
: header(params.input_creator_params.sample),
row_input_format_params(params.input_creator_params.row_input_format_params),
format_settings(params.input_creator_params.settings),
input_processor_creator(params.input_processor_creator),
min_chunk_bytes(params.min_chunk_bytes),
original_buffer(params.read_buffer),
// Subtract one thread that we use for segmentation and one for
// reading. After that, must have at least two threads left for
// parsing. See the assertion below.
pool(std::max(2, static_cast<int>(params.max_threads) - 2)),
file_segmentation_engine(params.file_segmentation_engine)
{
// See comment above.
assert(params.max_threads >= 4);
// One unit for each thread, including segmentator and reader, plus a
// couple more units so that the segmentation thread doesn't spuriously
// bump into reader thread on wraparound.
processing_units.resize(params.max_threads + 2);
segmentator_thread = ThreadFromGlobalPool(
&ParallelParsingBlockInputStream::segmentatorThreadFunction, this, CurrentThread::getGroup());
}
ParallelParsingBlockInputStream::~ParallelParsingBlockInputStream()
{
finishAndWait();
}
void ParallelParsingBlockInputStream::cancel(bool kill)
{
/**
* Can be called multiple times, from different threads. Saturate the
* the kill flag with OR.
*/
if (kill)
is_killed = true;
is_cancelled = true;
/*
* The format parsers themselves are not being cancelled here, so we'll
* have to wait until they process the current block. Given that the
* chunk size is on the order of megabytes, this shouldn't be too long.
* We can't call IInputFormat->cancel here, because the parser object is
* local to the parser thread, and we don't want to introduce any
* synchronization between parser threads and the other threads to get
* better performance. An ideal solution would be to add a callback to
* IInputFormat that checks whether it was cancelled.
*/
finishAndWait();
}
void ParallelParsingBlockInputStream::scheduleParserThreadForUnitWithNumber(size_t ticket_number)
{
pool.scheduleOrThrowOnError([this, ticket_number, group = CurrentThread::getGroup()]()
{
parserThreadFunction(group, ticket_number);
});
}
void ParallelParsingBlockInputStream::finishAndWait()
{
finished = true;
{
std::unique_lock<std::mutex> lock(mutex);
segmentator_condvar.notify_all();
reader_condvar.notify_all();
}
if (segmentator_thread.joinable())
segmentator_thread.join();
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ParallelParsingBlockInputStream::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group)
{
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
setThreadName("Segmentator");
try
{
while (!finished)
{
const auto current_unit_number = segmentator_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
{
std::unique_lock<std::mutex> lock(mutex);
segmentator_condvar.wait(lock,
[&]{ return unit.status == READY_TO_INSERT || finished; });
}
if (finished)
{
break;
}
assert(unit.status == READY_TO_INSERT);
// Segmentating the original input.
unit.segment.resize(0);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(
original_buffer, unit.segment, min_chunk_bytes);
unit.offset = successfully_read_rows_count;
successfully_read_rows_count += currently_read_rows;
unit.is_last = !have_more_data;
unit.status = READY_TO_PARSE;
scheduleParserThreadForUnitWithNumber(segmentator_ticket_number);
++segmentator_ticket_number;
if (!have_more_data)
{
break;
}
}
}
catch (...)
{
onBackgroundException(successfully_read_rows_count);
}
}
void ParallelParsingBlockInputStream::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
setThreadName("ChunkParser");
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
try
{
/*
* This is kind of suspicious -- the input_process_creator contract with
* respect to multithreaded use is not clear, but we hope that it is
* just a 'normal' factory class that doesn't have any state, and so we
* can use it from multiple threads simultaneously.
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
auto format = input_processor_creator(read_buffer, header, row_input_format_params, format_settings);
format->setCurrentUnitNumber(current_ticket_number);
auto parser = std::make_unique<InputStreamFromInputFormat>(std::move(format));
unit.block_ext.block.clear();
unit.block_ext.block_missing_values.clear();
// We don't know how many blocks will be. So we have to read them all
// until an empty block occurred.
Block block;
while (!finished && (block = parser->read()) != Block())
{
unit.block_ext.block.emplace_back(block);
unit.block_ext.block_missing_values.emplace_back(parser->getMissingValues());
}
// We suppose we will get at least some blocks for a non-empty buffer,
// except at the end of file. Also see a matching assert in readImpl().
assert(unit.is_last || !unit.block_ext.block.empty());
std::unique_lock<std::mutex> lock(mutex);
unit.status = READY_TO_READ;
reader_condvar.notify_all();
}
catch (...)
{
onBackgroundException(unit.offset);
}
}
void ParallelParsingBlockInputStream::onBackgroundException(size_t offset)
{
std::unique_lock<std::mutex> lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();
if (ParsingException * e = exception_cast<ParsingException *>(background_exception))
if (e->getLineNumber() != -1)
e->setLineNumber(e->getLineNumber() + offset);
}
tryLogCurrentException(__PRETTY_FUNCTION__);
finished = true;
reader_condvar.notify_all();
segmentator_condvar.notify_all();
}
Block ParallelParsingBlockInputStream::readImpl()
{
if (isCancelledOrThrowIfKilled() || finished)
{
/**
* Check for background exception and rethrow it before we return.
*/
std::unique_lock<std::mutex> lock(mutex);
if (background_exception)
{
lock.unlock();
cancel(false);
std::rethrow_exception(background_exception);
}
return Block{};
}
const auto current_unit_number = reader_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
if (!next_block_in_current_unit.has_value())
{
// We have read out all the Blocks from the previous Processing Unit,
// wait for the current one to become ready.
std::unique_lock<std::mutex> lock(mutex);
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; });
if (finished)
{
/**
* Check for background exception and rethrow it before we return.
*/
if (background_exception)
{
lock.unlock();
cancel(false);
std::rethrow_exception(background_exception);
}
return Block{};
}
assert(unit.status == READY_TO_READ);
next_block_in_current_unit = 0;
}
if (unit.block_ext.block.empty())
{
/*
* Can we get zero blocks for an entire segment, when the format parser
* skips it entire content and does not create any blocks? Probably not,
* but if we ever do, we should add a loop around the above if, to skip
* these. Also see a matching assert in the parser thread.
*/
assert(unit.is_last);
finished = true;
return Block{};
}
assert(next_block_in_current_unit.value() < unit.block_ext.block.size());
Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit));
last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]);
next_block_in_current_unit.value() += 1;
if (*next_block_in_current_unit == unit.block_ext.block.size())
{
// Finished reading this Processing Unit, move to the next one.
next_block_in_current_unit.reset();
++reader_ticket_number;
if (unit.is_last)
{
// It it was the last unit, we're finished.
finished = true;
}
else
{
// Pass the unit back to the segmentator.
std::unique_lock<std::mutex> lock(mutex);
unit.status = READY_TO_INSERT;
segmentator_condvar.notify_all();
}
}
return res;
}
}

View File

@ -1,181 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Common/ThreadPool.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
namespace DB
{
class ReadBuffer;
/**
* ORDER-PRESERVING parallel parsing of data formats.
* It splits original data into chunks. Then each chunk is parsed by different thread.
* The number of chunks equals to the number or parser threads.
* The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting.
*
* This stream has three kinds of threads: one segmentator, multiple parsers,
* and one reader thread -- that is, the one from which readImpl() is called.
* They operate one after another on parts of data called "processing units".
* One unit consists of buffer with raw data from file, filled by segmentator
* thread. This raw data is then parsed by a parser thread to form a number of
* Blocks. These Blocks are returned to the parent stream from readImpl().
* After being read out, a processing unit is reused, to save on allocating
* memory for the raw buffer. The processing units are organized into a circular
* array to facilitate reuse and to apply backpressure on the segmentator thread
* -- after it runs out of processing units, it has to wait for the reader to
* read out the previous blocks.
* The outline of what the threads do is as follows:
* segmentator thread:
* 1) wait for the next processing unit to become empty
* 2) fill it with a part of input file
* 3) start a parser thread
* 4) repeat until eof
* parser thread:
* 1) parse the given raw buffer without any synchronization
* 2) signal that the given unit is ready to read
* 3) finish
* readImpl():
* 1) wait for the next processing unit to become ready to read
* 2) take the blocks from the processing unit to return them to the caller
* 3) signal that the processing unit is empty
* 4) repeat until it encounters unit that is marked as "past_the_end"
* All threads must also check for cancel/eof/exception flags.
*/
class ParallelParsingBlockInputStream : public IBlockInputStream
{
private:
using ReadCallback = std::function<void()>;
using InputProcessorCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
public:
struct InputCreatorParams
{
const Block & sample;
const RowInputFormatParams & row_input_format_params;
const FormatSettings &settings;
};
struct Params
{
ReadBuffer & read_buffer;
const InputProcessorCreator & input_processor_creator;
const InputCreatorParams & input_creator_params;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
size_t max_threads;
size_t min_chunk_bytes;
};
explicit ParallelParsingBlockInputStream(const Params & params);
~ParallelParsingBlockInputStream() override;
String getName() const override { return "ParallelParsing"; }
Block getHeader() const override { return header; }
void cancel(bool kill) override;
protected:
// Reader routine
Block readImpl() override;
const BlockMissingValues & getMissingValues() const override
{
return last_block_missing_values;
}
private:
const Block header;
const RowInputFormatParams row_input_format_params;
const FormatSettings format_settings;
const InputProcessorCreator input_processor_creator;
const size_t min_chunk_bytes;
/*
* This is declared as atomic to avoid UB, because parser threads access it
* without synchronization.
*/
std::atomic<bool> finished{false};
BlockMissingValues last_block_missing_values;
// Original ReadBuffer to read from.
ReadBuffer & original_buffer;
//Non-atomic because it is used in one thread.
std::optional<size_t> next_block_in_current_unit;
size_t segmentator_ticket_number{0};
size_t reader_ticket_number{0};
std::mutex mutex;
std::condition_variable reader_condvar;
std::condition_variable segmentator_condvar;
// There are multiple "parsers", that's why we use thread pool.
ThreadPool pool;
// Reading and segmentating the file
ThreadFromGlobalPool segmentator_thread;
// Function to segment the file. Then "parsers" will parse that segments.
FormatFactory::FileSegmentationEngine file_segmentation_engine;
enum ProcessingUnitStatus
{
READY_TO_INSERT,
READY_TO_PARSE,
READY_TO_READ
};
struct BlockExt
{
std::vector<Block> block;
std::vector<BlockMissingValues> block_missing_values;
};
struct ProcessingUnit
{
explicit ProcessingUnit()
: status(ProcessingUnitStatus::READY_TO_INSERT)
{
}
BlockExt block_ext;
Memory<> segment;
std::atomic<ProcessingUnitStatus> status;
/// Needed for better exception message.
size_t offset = 0;
bool is_last{false};
};
std::exception_ptr background_exception = nullptr;
// We use deque instead of vector, because it does not require a move
// constructor, which is absent for atomics that are inside ProcessingUnit.
std::deque<ProcessingUnit> processing_units;
/// Compute it to have a more understandable error message.
size_t successfully_read_rows_count{0};
void scheduleParserThreadForUnitWithNumber(size_t ticket_number);
void finishAndWait();
void segmentatorThreadFunction(ThreadGroupStatusPtr thread_group);
void parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number);
// Save/log a background exception, set termination flag, wake up all
// threads. This function is used by segmentator and parsed threads.
// readImpl() is called from the main thread, so the exception handling
// is different.
void onBackgroundException(size_t offset);
};
}

View File

@ -35,7 +35,6 @@ SRCS(
MongoDBBlockInputStream.cpp
NativeBlockInputStream.cpp
NativeBlockOutputStream.cpp
ParallelParsingBlockInputStream.cpp
PushingToViewsBlockOutputStream.cpp
RemoteBlockInputStream.cpp
RemoteBlockOutputStream.cpp

View File

@ -184,7 +184,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64
context, format, sample_block, command, log,
[&ids, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputFormat(format, out, sample_block);
auto output_stream = context.getOutputStream(format, out, sample_block);
formatIDs(output_stream, ids);
out.close();
});
@ -198,7 +198,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_col
context, format, sample_block, command, log,
[key_columns, &requested_rows, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputFormat(format, out, sample_block);
auto output_stream = context.getOutputStream(format, out, sample_block);
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
out.close();
});

View File

@ -134,7 +134,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputFormat(format, out_buffer, sample_block);
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
formatIDs(output_stream, ids);
};
@ -153,7 +153,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputFormat(format, out_buffer, sample_block);
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
};

View File

@ -4,6 +4,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeoutsContext.h>
@ -47,8 +48,8 @@ namespace
: name(name_)
{
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, callback, timeouts);
reader
= FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size);
auto format = FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<InputStreamFromInputFormat>(format);
}
Block getHeader() const override { return reader->getHeader(); }

View File

@ -5,18 +5,22 @@
#include <Interpreters/Context.h>
#include <Core/Settings.h>
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/ParallelParsingBlockInputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/PostgreSQLOutputFormat.h>
#include <Processors/Formats/Impl/NativeFormat.h>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <Processors/Formats/Impl/ParallelFormattingOutputFormat.h>
#include <Poco/URI.h>
#include <IO/ReadHelpers.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
@ -132,7 +136,7 @@ FormatSettings getFormatSettings<Settings>(const Context & context,
const Settings & settings);
BlockInputStreamPtr FormatFactory::getInput(
InputFormatPtr FormatFactory::getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
@ -141,19 +145,14 @@ BlockInputStreamPtr FormatFactory::getInput(
const std::optional<FormatSettings> & _format_settings) const
{
if (name == "Native")
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
return std::make_shared<NativeInputFormatFromNativeBlockInputStream>(sample, buf);
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);
if (!getCreators(name).input_processor_creator)
{
const auto & input_getter = getCreators(name).input_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
return input_getter(buf, sample, max_block_size, {}, format_settings);
throw Exception("Format " + name + " is not suitable for input (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
}
const Settings & settings = context.getSettingsRef();
@ -166,6 +165,9 @@ BlockInputStreamPtr FormatFactory::getInput(
if (settings.max_memory_usage && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage)
parallel_parsing = false;
if (settings.max_memory_usage_for_user && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage_for_user)
parallel_parsing = false;
if (parallel_parsing && name == "JSONEachRow")
{
/// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix()
@ -179,8 +181,6 @@ BlockInputStreamPtr FormatFactory::getInput(
if (parallel_parsing)
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
@ -189,23 +189,56 @@ BlockInputStreamPtr FormatFactory::getInput(
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto input_creator_params =
ParallelParsingBlockInputStream::InputCreatorParams{sample,
row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Params params{buf, input_getter,
input_creator_params, file_segmentation_engine,
settings.max_threads,
settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputStream>(params);
/// Const reference is copied to lambda.
auto parser_creator = [input_getter, sample, row_input_format_params, format_settings]
(ReadBuffer & input) -> InputFormatPtr
{ return input_getter(input, sample, row_input_format_params, format_settings); };
ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingInputFormat>(params);
}
auto format = getInputFormat(name, buf, sample, context, max_block_size,
format_settings);
return std::make_shared<InputStreamFromInputFormat>(std::move(format));
auto format = getInputFormat(name, buf, sample, context, max_block_size, format_settings);
return format;
}
BlockOutputStreamPtr FormatFactory::getOutputStreamParallelIfPossible(const String & name,
WriteBuffer & buf, const Block & sample, const Context & context,
WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
const Settings & settings = context.getSettingsRef();
bool parallel_formatting = settings.output_format_parallel_formatting;
if (output_getter && parallel_formatting && getCreators(name).supports_parallel_formatting
&& !settings.output_format_json_array_of_rows)
{
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);
auto formatter_creator = [output_getter, sample, callback, format_settings]
(WriteBuffer & output) -> OutputFormatPtr
{ return output_getter(output, sample, {std::move(callback)}, format_settings);};
ParallelFormattingOutputFormat::Params params{buf, sample, formatter_creator, settings.max_threads};
auto format = std::make_shared<ParallelFormattingOutputFormat>(params);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
format->setAutoFlush();
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
return getOutputStream(name, buf, sample, context, callback, _format_settings);
}
BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
BlockOutputStreamPtr FormatFactory::getOutputStream(const String & name,
WriteBuffer & buf, const Block & sample, const Context & context,
WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const
{
@ -226,10 +259,8 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
sample);
}
auto format = getOutputFormat(name, buf, sample, context, std::move(callback),
format_settings);
return std::make_shared<MaterializingBlockOutputStream>(
std::make_shared<OutputStreamToOutputFormat>(format), sample);
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), _format_settings);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
@ -266,6 +297,35 @@ InputFormatPtr FormatFactory::getInputFormat(
return format;
}
OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
const String & name, WriteBuffer & buf, const Block & sample,
const Context & context, WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);
const Settings & settings = context.getSettingsRef();
if (settings.output_format_parallel_formatting && getCreators(name).supports_parallel_formatting
&& !settings.output_format_json_array_of_rows)
{
auto formatter_creator = [output_getter, sample, callback, format_settings]
(WriteBuffer & output) -> OutputFormatPtr
{ return output_getter(output, sample, {std::move(callback)}, format_settings);};
ParallelFormattingOutputFormat::Params builder{buf, sample, formatter_creator, settings.max_threads};
return std::make_shared<ParallelFormattingOutputFormat>(builder);
}
return getOutputFormat(name, buf, sample, context, callback, _format_settings);
}
OutputFormatPtr FormatFactory::getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample,
@ -274,7 +334,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
RowOutputFormatParams params;
params.callback = std::move(callback);
@ -339,6 +399,16 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm
target = std::move(file_segmentation_engine);
}
void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & name)
{
auto & target = dict[name].supports_parallel_formatting;
if (target)
throw Exception("FormatFactory: Output format " + name + " is already marked as supporting parallel formatting.", ErrorCodes::LOGICAL_ERROR);
target = true;
}
FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;

View File

@ -79,11 +79,13 @@ private:
WriteCallback callback,
const FormatSettings & settings)>;
using InputProcessorCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
using InputProcessorCreatorFunc = InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings);
using InputProcessorCreator = std::function<InputProcessorCreatorFunc>;
using OutputProcessorCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
@ -98,6 +100,7 @@ private:
InputProcessorCreator input_processor_creator;
OutputProcessorCreator output_processor_creator;
FileSegmentationEngine file_segmentation_engine;
bool supports_parallel_formatting{false};
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -105,7 +108,7 @@ private:
public:
static FormatFactory & instance();
BlockInputStreamPtr getInput(
InputFormatPtr getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
@ -113,7 +116,14 @@ public:
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
/// Checks all preconditions. Returns ordinary stream if parallel formatting cannot be done.
/// Currently used only in Client. Don't use it something else! Better look at getOutputFormatParallelIfPossible.
BlockOutputStreamPtr getOutputStreamParallelIfPossible(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context, WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
/// Currently used only in Client. Don't use it something else! Better look at getOutputFormat.
BlockOutputStreamPtr getOutputStream(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context, WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
@ -125,6 +135,12 @@ public:
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
/// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done.
OutputFormatPtr getOutputFormatParallelIfPossible(
const String & name, WriteBuffer & buf, const Block & sample,
const Context & context, WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
OutputFormatPtr getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample,
const Context & context, WriteCallback callback = {},
@ -138,6 +154,8 @@ public:
void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator);
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);
void markOutputFormatSupportsParallelFormatting(const String & name);
const FormatsDictionary & getAllFormats() const
{
return dict;

View File

@ -18,7 +18,6 @@ target_link_libraries(clickhouse_functions
clickhouse_dictionaries_embedded
clickhouse_parsers
consistent-hashing
consistent-hashing-sumbur
dbms
metrohash
murmurhash

View File

@ -416,8 +416,8 @@ private:
DivideIntegralImpl<NativeResultType, NativeResultType>, /// substitute divide by intDiv (throw on division by zero)
Operation<NativeResultType, NativeResultType>>;
template <OpCase op_case, OpCase target>
static auto unwrap(const auto& elem, size_t i)
template <OpCase op_case, OpCase target, class E>
static auto unwrap(const E& elem, size_t i)
{
if constexpr (op_case == target)
return undec(elem);
@ -744,8 +744,8 @@ class FunctionBinaryArithmetic : public IFunction
return function->execute(new_arguments, result_type, input_rows_count);
}
template <class T, class ResultDataType>
static auto helperGetOrConvert(const auto & col_const, const auto & col)
template <class T, class ResultDataType, class CC, class C>
static auto helperGetOrConvert(const CC & col_const, const C & col)
{
using ResultType = typename ResultDataType::FieldType;
using NativeResultType = typename NativeType<ResultType>::Type;
@ -756,8 +756,9 @@ class FunctionBinaryArithmetic : public IFunction
return col_const->template getValue<T>();
}
template <OpCase op_case, bool left_decimal, bool right_decimal, class OpImpl, class OpImplCheck>
void helperInvokeEither(const auto& left, const auto& right, auto& vec_res, auto scale_a, auto scale_b) const
template <OpCase op_case, bool left_decimal, bool right_decimal, class OpImpl, class OpImplCheck,
class L, class R, class VR, class SA, class SB>
void helperInvokeEither(const L& left, const R& right, VR& vec_res, SA scale_a, SB scale_b) const
{
if (check_decimal_overflow)
OpImplCheck::template process<op_case, left_decimal, right_decimal>(left, right, vec_res, scale_a, scale_b);
@ -765,11 +766,12 @@ class FunctionBinaryArithmetic : public IFunction
OpImpl::template process<op_case, left_decimal, right_decimal>(left, right, vec_res, scale_a, scale_b);
}
template <class LeftDataType, class RightDataType, class ResultDataType>
template <class LeftDataType, class RightDataType, class ResultDataType,
class L, class R, class CL, class CR>
ColumnPtr executeNumericWithDecimal(
const auto & left, const auto & right,
const L & left, const R & right,
const ColumnConst * const col_left_const, const ColumnConst * const col_right_const,
const auto * const col_left, const auto * const col_right,
const CL * const col_left, const CR * const col_right,
size_t col_left_size) const
{
using T0 = typename LeftDataType::FieldType;

View File

@ -1,7 +1,8 @@
#pragma once
#include <Common/hex.h>
#include <Common/formatIPv6.h>
#include <Common/hex.h>
#include <Common/IPv6ToBinary.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeFactory.h>
@ -1617,20 +1618,28 @@ public:
class FunctionIPv6CIDRToRange : public IFunction
{
private:
/// TODO Inefficient.
#if defined(__SSE2__)
#include <emmintrin.h>
static inline void applyCIDRMask(const UInt8 * __restrict src, UInt8 * __restrict dst_lower, UInt8 * __restrict dst_upper, UInt8 bits_to_keep)
{
__m128i mask = _mm_loadu_si128(reinterpret_cast<const __m128i *>(getCIDRMaskIPv6(bits_to_keep)));
__m128i lower = _mm_and_si128(_mm_loadu_si128(reinterpret_cast<const __m128i *>(src)), mask);
_mm_storeu_si128(reinterpret_cast<__m128i *>(dst_lower), lower);
__m128i inv_mask = _mm_xor_si128(mask, _mm_cmpeq_epi32(_mm_setzero_si128(), _mm_setzero_si128()));
__m128i upper = _mm_or_si128(lower, inv_mask);
_mm_storeu_si128(reinterpret_cast<__m128i *>(dst_upper), upper);
}
#else
/// NOTE IPv6 is stored in memory in big endian format that makes some difficulties.
static void applyCIDRMask(const UInt8 * __restrict src, UInt8 * __restrict dst_lower, UInt8 * __restrict dst_upper, UInt8 bits_to_keep)
{
UInt8 mask[16]{};
UInt8 bytes_to_keep = bits_to_keep / 8;
UInt8 bits_to_keep_in_last_byte = bits_to_keep % 8;
for (size_t i = 0; i < bits_to_keep / 8; ++i)
mask[i] = 0xFFU;
if (bits_to_keep_in_last_byte)
mask[bytes_to_keep] = 0xFFU << (8 - bits_to_keep_in_last_byte);
const auto * mask = getCIDRMaskIPv6(bits_to_keep);
for (size_t i = 0; i < 16; ++i)
{
@ -1639,6 +1648,8 @@ private:
}
}
#endif
public:
static constexpr auto name = "IPv6CIDRToRange";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionIPv6CIDRToRange>(); }

View File

@ -2,7 +2,6 @@
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Common/UnicodeBar.h>
#include <Common/FieldVisitors.h>
#include <IO/WriteHelpers.h>
@ -57,23 +56,30 @@ public:
+ ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!isNativeNumber(arguments[0]) || !isNativeNumber(arguments[1]) || !isNativeNumber(arguments[2])
|| (arguments.size() == 4 && !isNativeNumber(arguments[3])))
if (!isNumber(arguments[0]) || !isNumber(arguments[1]) || !isNumber(arguments[2])
|| (arguments.size() == 4 && !isNumber(arguments[3])))
throw Exception("All arguments for function " + getName() + " must be numeric.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeString>();
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2, 3}; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {3}; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
Int64 min = extractConstant<Int64>(arguments, 1, "Second"); /// The level at which the line has zero length.
Int64 max = extractConstant<Int64>(arguments, 2, "Third"); /// The level at which the line has the maximum length.
/// The maximum width of the bar in characters.
Float64 max_width = 80; /// Motivated by old-school terminal size.
/// The maximum width of the bar in characters, by default.
Float64 max_width = arguments.size() == 4 ? extractConstant<Float64>(arguments, 3, "Fourth") : 80;
if (arguments.size() == 4)
{
const auto & max_width_column = *arguments[3].column;
if (!isColumnConst(max_width_column))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Fourth argument for function {} must be constant", getName());
max_width = max_width_column.getFloat64(0);
}
if (isNaN(max_width))
throw Exception("Argument 'max_width' must not be NaN", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
@ -86,83 +92,32 @@ public:
const auto & src = *arguments[0].column;
auto res_column = ColumnString::create();
if (executeNumber<UInt8>(src, *res_column, min, max, max_width)
|| executeNumber<UInt16>(src, *res_column, min, max, max_width)
|| executeNumber<UInt32>(src, *res_column, min, max, max_width)
|| executeNumber<UInt64>(src, *res_column, min, max, max_width)
|| executeNumber<Int8>(src, *res_column, min, max, max_width)
|| executeNumber<Int16>(src, *res_column, min, max, max_width)
|| executeNumber<Int32>(src, *res_column, min, max, max_width)
|| executeNumber<Int64>(src, *res_column, min, max, max_width)
|| executeNumber<Float32>(src, *res_column, min, max, max_width)
|| executeNumber<Float64>(src, *res_column, min, max, max_width))
{
return res_column;
}
else
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
private:
template <typename T>
T extractConstant(const ColumnsWithTypeAndName & arguments, size_t argument_pos, const char * which_argument) const
{
const auto & column = *arguments[argument_pos].column;
if (!isColumnConst(column))
throw Exception(
which_argument + String(" argument for function ") + getName() + " must be constant.", ErrorCodes::ILLEGAL_COLUMN);
return applyVisitor(FieldVisitorConvertToNumber<T>(), column[0]);
}
template <typename T>
static void fill(const PaddedPODArray<T> & src,
ColumnString::Chars & dst_chars,
ColumnString::Offsets & dst_offsets,
Int64 min,
Int64 max,
Float64 max_width)
{
size_t size = src.size();
size_t current_offset = 0;
dst_offsets.resize(size);
dst_chars.reserve(size * (UnicodeBar::getWidthInBytes(max_width) + 1)); /// lines 0-terminated.
auto res_column = ColumnString::create();
for (size_t i = 0; i < size; ++i)
ColumnString::Chars & dst_chars = res_column->getChars();
ColumnString::Offsets & dst_offsets = res_column->getOffsets();
dst_offsets.resize(input_rows_count);
dst_chars.reserve(input_rows_count * (UnicodeBar::getWidthInBytes(max_width) + 1)); /// strings are 0-terminated.
for (size_t i = 0; i < input_rows_count; ++i)
{
Float64 width = UnicodeBar::getWidth(src[i], min, max, max_width);
Float64 width = UnicodeBar::getWidth(
src.getFloat64(i),
arguments[1].column->getFloat64(i),
arguments[2].column->getFloat64(i),
max_width);
size_t next_size = current_offset + UnicodeBar::getWidthInBytes(width) + 1;
dst_chars.resize(next_size);
UnicodeBar::render(width, reinterpret_cast<char *>(&dst_chars[current_offset]));
current_offset = next_size;
dst_offsets[i] = current_offset;
}
}
template <typename T>
static void fill(T src, String & dst_chars, Int64 min, Int64 max, Float64 max_width)
{
Float64 width = UnicodeBar::getWidth(src, min, max, max_width);
dst_chars.resize(UnicodeBar::getWidthInBytes(width));
UnicodeBar::render(width, dst_chars.data());
}
template <typename T>
static bool executeNumber(const IColumn & src, ColumnString & dst, Int64 min, Int64 max, Float64 max_width)
{
if (const ColumnVector<T> * col = checkAndGetColumn<ColumnVector<T>>(&src))
{
fill(col->getData(), dst.getChars(), dst.getOffsets(), min, max, max_width);
return true;
}
else
return false;
return res_column;
}
};

View File

@ -0,0 +1,85 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunctionImpl.h>
namespace DB
{
namespace
{
/** byteSize() - get the value size in number of bytes for accounting purposes.
*/
class FunctionByteSize : public IFunction
{
public:
static constexpr auto name = "byteSize";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionByteSize>();
}
String getName() const override { return name; }
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeUInt64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
size_t num_args = arguments.size();
/// If the resulting size is constant, return constant column.
bool all_constant = true;
UInt64 constant_size = 0;
for (size_t arg_num = 0; arg_num < num_args; ++arg_num)
{
if (arguments[arg_num].type->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion())
{
constant_size += arguments[arg_num].type->getSizeOfValueInMemory();
}
else
{
all_constant = false;
break;
}
}
if (all_constant)
return result_type->createColumnConst(input_rows_count, constant_size);
auto result_col = ColumnUInt64::create(input_rows_count);
auto & vec_res = result_col->getData();
for (size_t arg_num = 0; arg_num < num_args; ++arg_num)
{
const IColumn * column = arguments[arg_num].column.get();
if (arg_num == 0)
for (size_t row_num = 0; row_num < input_rows_count; ++row_num)
vec_res[row_num] = column->byteSizeAt(row_num);
else
for (size_t row_num = 0; row_num < input_rows_count; ++row_num)
vec_res[row_num] += column->byteSizeAt(row_num);
}
return result_col;
}
};
}
void registerFunctionByteSize(FunctionFactory & factory)
{
factory.registerFunction<FunctionByteSize>();
}
}

View File

@ -1,22 +1,14 @@
namespace DB
{
class FunctionFactory;
void registerFunctionYandexConsistentHash(FunctionFactory & factory);
void registerFunctionJumpConsistentHash(FunctionFactory & factory);
#if !defined(ARCADIA_BUILD)
void registerFunctionSumburConsistentHash(FunctionFactory & factory);
#endif
void registerFunctionsConsistentHashing(FunctionFactory & factory)
{
registerFunctionYandexConsistentHash(factory);
registerFunctionJumpConsistentHash(factory);
#if !defined(ARCADIA_BUILD)
registerFunctionSumburConsistentHash(factory);
#endif
}
}

View File

@ -66,6 +66,7 @@ void registerFunctionHasThreadFuzzer(FunctionFactory &);
void registerFunctionInitializeAggregation(FunctionFactory &);
void registerFunctionErrorCodeToName(FunctionFactory &);
void registerFunctionTcpPort(FunctionFactory &);
void registerFunctionByteSize(FunctionFactory &);
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
@ -132,6 +133,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionInitializeAggregation(factory);
registerFunctionErrorCodeToName(factory);
registerFunctionTcpPort(factory);
registerFunctionByteSize(factory);
#if USE_ICU
registerFunctionConvertCharset(factory);

View File

@ -1,38 +0,0 @@
#include "FunctionsConsistentHashing.h"
#include <Functions/FunctionFactory.h>
#include <sumbur.h>
namespace DB
{
namespace
{
struct SumburConsistentHashImpl
{
static constexpr auto name = "sumburConsistentHash";
using HashType = UInt32;
using ResultType = UInt16;
using BucketsType = ResultType;
static constexpr auto max_buckets = static_cast<UInt64>(std::numeric_limits<BucketsType>::max());
static inline ResultType apply(HashType hash, BucketsType n)
{
return static_cast<ResultType>(sumburConsistentHash(hash, n));
}
};
using FunctionSumburConsistentHash = FunctionConsistentHashImpl<SumburConsistentHashImpl>;
}
void registerFunctionSumburConsistentHash(FunctionFactory & factory)
{
factory.registerFunction<FunctionSumburConsistentHash>();
}
}

View File

@ -204,6 +204,7 @@ SRCS(
blockSerializedSize.cpp
blockSize.cpp
buildId.cpp
byteSize.cpp
caseWithExpression.cpp
cbrt.cpp
coalesce.cpp

View File

@ -35,7 +35,7 @@ PEERDIR(
# "Arcadia" build is slightly deficient. It lacks many libraries that we need.
SRCS(
<? find . -name '*.cpp' | grep -i -v -P 'tests|Bitmap|sumbur|abtesting' | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -i -v -P 'tests|Bitmap|abtesting' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -149,4 +149,35 @@ public:
};
/** Buffer that could write data to external memory which came from outside
* Template parameter: ReadBuffer or WriteBuffer
*/
template <typename Base>
class BufferWithOutsideMemory : public Base
{
protected:
Memory<> & memory;
public:
explicit BufferWithOutsideMemory(Memory<> & memory_)
: Base(memory_.data(), memory_.size()), memory(memory_)
{
Base::set(memory.data(), memory.size(), 0);
Base::padded = false;
}
size_t getActualSize()
{
return Base::count();
}
private:
void nextImpl() override final
{
const size_t prev_size = Base::position() - memory.data();
memory.resize(2 * prev_size + 1);
Base::set(memory.data() + prev_size, memory.size() - prev_size, 0);
}
};
}

View File

@ -27,6 +27,8 @@ namespace ErrorCodes
class WriteBuffer : public BufferBase
{
public:
using BufferBase::set;
using BufferBase::position;
WriteBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) {}
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); }

View File

@ -35,7 +35,7 @@ private:
/// tear down the entire WriteBuffer thing and implement it again,
/// properly.
size_t continuation_size = std::max(size_t(1),
std::max(count(), arena.remainingSpaceInCurrentChunk()));
std::max(count(), arena.remainingSpaceInCurrentMemoryChunk()));
/// allocContinue method will possibly move memory region to new place and modify "begin" pointer.

Some files were not shown because too many files have changed in this diff Show More