Merge branch 'master' into always-detach-parts-with-wrong-partition-id

This commit is contained in:
alexey-milovidov 2021-06-12 02:55:55 +03:00 committed by GitHub
commit 30f0124e77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
208 changed files with 2217 additions and 1235 deletions

View File

@ -2,6 +2,7 @@
#### Upgrade Notes
* One bug has been found after release: [#25187](https://github.com/ClickHouse/ClickHouse/issues/25187).
* Do not upgrade if you have partition key with `UUID`.
* `zstd` compression library is updated to v1.5.0. You may get messages about "checksum does not match" in replication. These messages are expected due to update of compression algorithm and you can ignore them. These messages are informational and do not indicate any kinds of undesired behaviour.
* The setting `compile_expressions` is enabled by default. Although it has been heavily tested on variety of scenarios, if you find some undesired behaviour on your servers, you can try turning this setting off.

View File

@ -1,7 +1,7 @@
if(NOT OS_FREEBSD AND NOT APPLE)
if(NOT OS_FREEBSD)
option(ENABLE_S3 "Enable S3" ${ENABLE_LIBRARIES})
elseif(ENABLE_S3 OR USE_INTERNAL_AWS_S3_LIBRARY)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use S3 on Apple or FreeBSD")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use S3 on FreeBSD")
endif()
if(NOT ENABLE_S3)

2
contrib/croaring vendored

@ -1 +1 @@
Subproject commit d8402939b5c9fc134fd4fcf058fe0f7006d2b129
Subproject commit 2c867e9f9c9e2a3a7032791f94c4c7ae3013f6e0

View File

@ -4,9 +4,9 @@ services:
image: sequenceiq/hadoop-docker:2.7.0
hostname: hdfs1
restart: always
ports:
- ${HDFS_NAME_EXTERNAL_PORT}:${HDFS_NAME_INTERNAL_PORT} #50070
- ${HDFS_DATA_EXTERNAL_PORT}:${HDFS_DATA_INTERNAL_PORT} #50075
expose:
- ${HDFS_NAME_PORT}
- ${HDFS_DATA_PORT}
entrypoint: /etc/bootstrap.sh -d
volumes:
- type: ${HDFS_FS:-tmpfs}

View File

@ -0,0 +1,23 @@
version: '2.3'
services:
bridge1:
image: yandex/clickhouse-jdbc-bridge
command: |
/bin/bash -c 'cat << EOF > config/datasources/self.json
{
"self": {
"jdbcUrl": "jdbc:clickhouse://instance:8123/test",
"username": "default",
"password": "",
"maximumPoolSize": 5
}
}
EOF
./docker-entrypoint.sh'
ports:
- 9020:9019
healthcheck:
test: ["CMD", "curl", "-s", "localhost:9019/ping"]
interval: 5s
timeout: 3s
retries: 30

View File

@ -14,9 +14,9 @@ services:
- type: ${KERBERIZED_HDFS_FS:-tmpfs}
source: ${KERBERIZED_HDFS_LOGS:-}
target: /var/log/hadoop-hdfs
ports:
- ${KERBERIZED_HDFS_NAME_EXTERNAL_PORT}:${KERBERIZED_HDFS_NAME_INTERNAL_PORT} #50070
- ${KERBERIZED_HDFS_DATA_EXTERNAL_PORT}:${KERBERIZED_HDFS_DATA_INTERNAL_PORT} #1006
expose:
- ${KERBERIZED_HDFS_NAME_PORT}
- ${KERBERIZED_HDFS_DATA_PORT}
depends_on:
- hdfskerberos
entrypoint: /etc/bootstrap.sh -d
@ -28,4 +28,4 @@ services:
- ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab
- ${KERBERIZED_HDFS_DIR}/../../kerberos_image_config.sh:/config.sh
- /dev/urandom:/dev/random
ports: [88, 749]
expose: [88, 749]

View File

@ -10,7 +10,7 @@ echo '{
"storage-driver": "overlay2",
"insecure-registries" : ["dockerhub-proxy.sas.yp-c.yandex.net:5000"],
"registry-mirrors" : ["http://dockerhub-proxy.sas.yp-c.yandex.net:5000"]
}' | dd of=/etc/docker/daemon.json
}' | dd of=/etc/docker/daemon.json 2>/dev/null
dockerd --host=unix:///var/run/docker.sock --host=tcp://0.0.0.0:2375 --default-address-pool base=172.17.0.0/12,size=24 &>/ClickHouse/tests/integration/dockerd.log &

View File

@ -34,6 +34,16 @@ def get_options(i):
if i % 2 == 1:
options.append(" --database=test_{}".format(i))
if i % 7 == 0:
options.append(" --client-option='join_use_nulls=1'")
if i % 14 == 0:
options.append(' --client-option="join_algorithm=\'partial_merge\'"')
if i % 21 == 0:
options.append(' --client-option="join_algorithm=\'auto\'"')
options.append(' --client-option="max_rows_in_join=1000"')
if i == 13:
options.append(" --client-option='memory_tracker_fault_probability=0.00001'")

View File

@ -7,7 +7,7 @@ toc_title: JDBC
Allows ClickHouse to connect to external databases via [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity).
To implement the JDBC connection, ClickHouse uses the separate program [clickhouse-jdbc-bridge](https://github.com/alex-krash/clickhouse-jdbc-bridge) that should run as a daemon.
To implement the JDBC connection, ClickHouse uses the separate program [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) that should run as a daemon.
This engine supports the [Nullable](../../../sql-reference/data-types/nullable.md) data type.
@ -18,19 +18,20 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
columns list...
)
ENGINE = JDBC(dbms_uri, external_database, external_table)
ENGINE = JDBC(datasource_uri, external_database, external_table)
```
**Engine Parameters**
- `dbms_uri` — URI of an external DBMS.
Format: `jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>`.
- `datasource_uri` — URI or name of an external DBMS.
URI Format: `jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>`.
Example for MySQL: `jdbc:mysql://localhost:3306/?user=root&password=root`.
- `external_database` — Database in an external DBMS.
- `external_table` — Name of the table in `external_database`.
- `external_table` — Name of the table in `external_database` or a select query like `select * from table1 where column1=1`.
## Usage Example {#usage-example}
@ -81,6 +82,12 @@ FROM jdbc_table
└────────┴──────────────┴───────┴────────────────┘
```
``` sql
INSERT INTO jdbc_table(`int_id`, `float`)
SELECT toInt32(number), toFloat32(number * 1.0)
FROM system.numbers
```
## See Also {#see-also}
- [JDBC table function](../../../sql-reference/table-functions/jdbc.md).

View File

@ -56,13 +56,13 @@ Note, that you can define multiple LDAP servers inside the `ldap_servers` sectio
- `port` — LDAP server port, default is `636` if `enable_tls` is set to `true`, `389` otherwise.
- `bind_dn` — Template used to construct the DN to bind to.
- The resulting DN will be constructed by replacing all `{user_name}` substrings of the template with the actual user name during each authentication attempt.
- `user_dn_detection` - Section with LDAP search parameters for detecting the actual user DN of the bound user.
- `user_dn_detection` Section with LDAP search parameters for detecting the actual user DN of the bound user.
- This is mainly used in search filters for further role mapping when the server is Active Directory. The resulting user DN will be used when replacing `{user_dn}` substrings wherever they are allowed. By default, user DN is set equal to bind DN, but once search is performed, it will be updated with to the actual detected user DN value.
- `base_dn` - Template used to construct the base DN for the LDAP search.
- `base_dn` Template used to construct the base DN for the LDAP search.
- The resulting DN will be constructed by replacing all `{user_name}` and `{bind_dn}` substrings of the template with the actual user name and bind DN during the LDAP search.
- `scope` - Scope of the LDAP search.
- `scope` Scope of the LDAP search.
- Accepted values are: `base`, `one_level`, `children`, `subtree` (the default).
- `search_filter` - Template used to construct the search filter for the LDAP search.
- `search_filter` Template used to construct the search filter for the LDAP search.
- The resulting filter will be constructed by replacing all `{user_name}`, `{bind_dn}`, and `{base_dn}` substrings of the template with the actual user name, bind DN, and base DN during the LDAP search.
- Note, that the special characters must be escaped properly in XML.
- `verification_cooldown` — A period of time, in seconds, after a successful bind attempt, during which the user will be assumed to be successfully authenticated for all consecutive requests without contacting the LDAP server.
@ -108,7 +108,6 @@ Note, that user `my_user` refers to `my_ldap_server`. This LDAP server must be c
When SQL-driven [Access Control and Account Management](../access-rights.md#access-control) is enabled, users that are authenticated by LDAP servers can also be created using the [CREATE USER](../../sql-reference/statements/create/user.md#create-user-statement) statement.
Query:
```sql

View File

@ -4,7 +4,7 @@ toc_priority: 144
# sumCount {#agg_function-sumCount}
Calculates the sum of the numbers and counts the number of rows at the same time.
Calculates the sum of the numbers and counts the number of rows at the same time. The function is used by ClickHouse query optimizer: if there are multiple `sum`, `count` or `avg` functions in a query, they can be replaced to single `sumCount` function to reuse the calculations. The function is rarely needed to use explicitly.
**Syntax**

View File

@ -5,6 +5,9 @@ toc_priority: 145
# sumKahan {#agg_function-sumKahan}
Calculates the sum of the numbers with [Kahan compensated summation algorithm](https://en.wikipedia.org/wiki/Kahan_summation_algorithm)
Slower than [sum](./sum.md) function.
The compensation works only for [Float](../../../sql-reference/data-types/float.md) types.
**Syntax**

View File

@ -4,7 +4,7 @@ toc_priority: 109
# topKWeighted {#topkweighted}
Similar to `topK` but takes one additional argument of integer type - `weight`. Every value is accounted `weight` times for frequency calculation.
Returns an array of the approximately most frequent values in the specified column. The resulting array is sorted in descending order of approximate frequency of values (not by the values themselves). Additionally, the weight of the value is taken into account.
**Syntax**
@ -15,11 +15,8 @@ topKWeighted(N)(x, weight)
**Arguments**
- `N` — The number of elements to return.
**Arguments**
- `x` — The value.
- `weight` — The weight. [UInt8](../../../sql-reference/data-types/int-uint.md).
- `weight` — The weight. Every value is accounted `weight` times for frequency calculation. [UInt64](../../../sql-reference/data-types/int-uint.md).
**Returned value**
@ -40,3 +37,7 @@ Result:
│ [999,998,997,996,995,994,993,992,991,990] │
└───────────────────────────────────────────┘
```
**See Also**
- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)

View File

@ -5,9 +5,9 @@ toc_title: jdbc
# jdbc {#table-function-jdbc}
`jdbc(jdbc_connection_uri, schema, table)` - returns table that is connected via JDBC driver.
`jdbc(datasource, schema, table)` - returns table that is connected via JDBC driver.
This table function requires separate `clickhouse-jdbc-bridge` program to be running.
This table function requires separate [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) program to be running.
It supports Nullable types (based on DDL of remote table that is queried).
**Examples**
@ -17,10 +17,22 @@ SELECT * FROM jdbc('jdbc:mysql://localhost:3306/?user=root&password=root', 'sche
```
``` sql
SELECT * FROM jdbc('mysql://localhost:3306/?user=root&password=root', 'schema', 'table')
SELECT * FROM jdbc('mysql://localhost:3306/?user=root&password=root', 'select * from schema.table')
```
``` sql
SELECT * FROM jdbc('datasource://mysql-local', 'schema', 'table')
SELECT * FROM jdbc('mysql-dev?p1=233', 'num Int32', 'select toInt32OrZero(''{{p1}}'') as num')
```
``` sql
SELECT *
FROM jdbc('mysql-dev?p1=233', 'num Int32', 'select toInt32OrZero(''{{p1}}'') as num')
```
``` sql
SELECT a.datasource AS server1, b.datasource AS server2, b.name AS db
FROM jdbc('mysql-dev?datasource_column', 'show databases') a
INNER JOIN jdbc('self?datasource_column', 'show databases') b ON a.Database = b.name
```
[Original article](https://clickhouse.tech/docs/en/query_language/table_functions/jdbc/) <!--hide-->

View File

@ -9,7 +9,7 @@ toc_title: JDBC
ClickHouseが外部データベースに接続できるようにする [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity).
JDBC接続を実装するには、ClickHouseは別のプログラムを使用します [clickhouse-jdbc-bridge](https://github.com/alex-krash/clickhouse-jdbc-bridge) うにしてくれました。
JDBC接続を実装するには、ClickHouseは別のプログラムを使用します [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) うにしてくれました。
このエンジンは [Null可能](../../../sql-reference/data-types/nullable.md) データ型。
@ -20,19 +20,19 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
columns list...
)
ENGINE = JDBC(dbms_uri, external_database, external_table)
ENGINE = JDBC(datasource_uri, external_database, external_table)
```
**エンジン変数**
- `dbms_uri` — URI of an external DBMS.
- `datasource_uri` — URI or name of an external DBMS.
形式: `jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>`.
URI形式: `jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>`.
MySQLの例: `jdbc:mysql://localhost:3306/?user=root&password=root`.
- `external_database` — Database in an external DBMS.
- `external_table` — Name of the table in `external_database`.
- `external_table` — Name of the table in `external_database` or a select query like `select * from table1 where column1=1`.
## 使用例 {#usage-example}
@ -83,6 +83,12 @@ FROM jdbc_table
└────────┴──────────────┴───────┴────────────────┘
```
``` sql
INSERT INTO jdbc_table(`int_id`, `float`)
SELECT toInt32(number), toFloat32(number * 1.0)
FROM system.numbers
```
## も参照。 {#see-also}
- [JDBCテーブル関数](../../../sql-reference/table-functions/jdbc.md).

View File

@ -7,9 +7,9 @@ toc_title: jdbc
# jdbc {#table-function-jdbc}
`jdbc(jdbc_connection_uri, schema, table)` -JDBCドライバ経由で接続されたテーブルを返します。
`jdbc(datasource, schema, table)` -JDBCドライバ経由で接続されたテーブルを返します。
このテーブル関数には、別々の `clickhouse-jdbc-bridge` 実行するプログラム。
このテーブル関数には、別々の [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) 実行するプログラム。
Null許容型をサポートします(照会されるリモートテーブルのDDLに基づきます)。
**例**
@ -19,11 +19,22 @@ SELECT * FROM jdbc('jdbc:mysql://localhost:3306/?user=root&password=root', 'sche
```
``` sql
SELECT * FROM jdbc('mysql://localhost:3306/?user=root&password=root', 'schema', 'table')
SELECT * FROM jdbc('mysql://localhost:3306/?user=root&password=root', 'select * from schema.table')
```
``` sql
SELECT * FROM jdbc('datasource://mysql-local', 'schema', 'table')
SELECT * FROM jdbc('mysql-dev?p1=233', 'num Int32', 'select toInt32OrZero(''{{p1}}'') as num')
```
``` sql
SELECT *
FROM jdbc('mysql-dev?p1=233', 'num Int32', 'select toInt32OrZero(''{{p1}}'') as num')
```
``` sql
SELECT a.datasource AS server1, b.datasource AS server2, b.name AS db
FROM jdbc('mysql-dev?datasource_column', 'show databases') a
INNER JOIN jdbc('self?datasource_column', 'show databases') b ON a.Database = b.name
```
[元の記事](https://clickhouse.tech/docs/en/query_language/table_functions/jdbc/) <!--hide-->

View File

@ -7,7 +7,7 @@ toc_title: JDBC
Позволяет ClickHouse подключаться к внешним базам данных с помощью [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity).
Для реализации соединения по JDBC ClickHouse использует отдельную программу [clickhouse-jdbc-bridge](https://github.com/alex-krash/clickhouse-jdbc-bridge), которая должна запускаться как демон.
Для реализации соединения по JDBC ClickHouse использует отдельную программу [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge), которая должна запускаться как демон.
Движок поддерживает тип данных [Nullable](../../../engines/table-engines/integrations/jdbc.md).
@ -15,20 +15,20 @@ toc_title: JDBC
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name
ENGINE = JDBC(dbms_uri, external_database, external_table)
ENGINE = JDBC(datasource_uri, external_database, external_table)
```
**Параметры движка**
- `dbms_uri` — URI внешней СУБД.
- `datasource_uri` — URI или имя внешней СУБД.
Формат: `jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>`.
URI Формат: `jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>`.
Пример для MySQL: `jdbc:mysql://localhost:3306/?user=root&password=root`.
- `external_database` — база данных во внешней СУБД.
- `external_table` — таблица в `external_database`.
- `external_table` — таблицы в `external_database` или запросе выбора, например` select * from table1, где column1 = 1`.
## Пример использования {#primer-ispolzovaniia}
@ -85,6 +85,12 @@ FROM jdbc_table
└────────┴──────────────┴───────┴────────────────┘
```
``` sql
INSERT INTO jdbc_table(`int_id`, `float`)
SELECT toInt32(number), toFloat32(number * 1.0)
FROM system.numbers
```
## Смотрите также {#smotrite-takzhe}
- [Табличная функция JDBC](../../../engines/table-engines/integrations/jdbc.md).

View File

@ -17,6 +17,7 @@
<yandex>
<!- ... -->
<ldap_servers>
<!- Typical LDAP server. -->
<my_ldap_server>
<host>localhost</host>
<port>636</port>
@ -31,6 +32,18 @@
<tls_ca_cert_dir>/path/to/tls_ca_cert_dir</tls_ca_cert_dir>
<tls_cipher_suite>ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:AES256-GCM-SHA384</tls_cipher_suite>
</my_ldap_server>
<!- Typical Active Directory with configured user DN detection for further role mapping. -->
<my_ad_server>
<host>localhost</host>
<port>389</port>
<bind_dn>EXAMPLE\{user_name}</bind_dn>
<user_dn_detection>
<base_dn>CN=Users,DC=example,DC=com</base_dn>
<search_filter>(&amp;(objectClass=user)(sAMAccountName={user_name}))</search_filter>
</user_dn_detection>
<enable_tls>no</enable_tls>
</my_ad_server>
</ldap_servers>
</yandex>
```
@ -41,9 +54,18 @@
- `host` — имя хоста сервера LDAP или его IP. Этот параметр обязательный и не может быть пустым.
- `port` — порт сервера LDAP. Если настройка `enable_tls` равна `true`, то по умолчанию используется порт `636`, иначе — порт `389`.
- `bind_dn` — шаблон для создания DN для привязки.
- `bind_dn` — шаблон для создания DN подключения.
- При формировании DN все подстроки `{user_name}` в шаблоне будут заменяться на фактическое имя пользователя при каждой попытке аутентификации.
- `verification_cooldown` — промежуток времени (в секундах) после успешной попытки привязки, в течение которого пользователь будет считаться аутентифицированным и сможет выполнять запросы без повторного обращения к серверам LDAP.
- `user_dn_detection` — секция с параметрами LDAP поиска для определения фактического значения DN подключенного пользователя.
- Это в основном используется в фильтрах поиска для дальнейшего сопоставления ролей, когда сервер является Active Directory. Полученный DN пользователя будет использоваться при замене подстрок `{user_dn}` везде, где они разрешены. По умолчанию DN пользователя устанавливается равным DN подключения, но после выполнения поиска он будет обновлен до фактического найденного значения DN пользователя.
- `base_dn` — шаблон для создания базового DN для LDAP поиска.
- При формировании DN все подстроки `{user_name}` и `{bind_dn}` в шаблоне будут заменяться на фактическое имя пользователя и DN подключения соответственно при каждом LDAP поиске.
- `scope` — область LDAP поиска.
- Возможные значения: `base`, `one_level`, `children`, `subtree` (по умолчанию).
- `search_filter` — шаблон для создания фильтра для каждого LDAP поиска.
- При формировании фильтра все подстроки `{user_name}`, `{bind_dn}`, `{user_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения, DN пользователя и базовый DN соответственно при каждом LDAP поиске.
- Обратите внимание, что специальные символы должны быть правильно экранированы в XML.
- `verification_cooldown` — промежуток времени (в секундах) после успешной попытки подключения, в течение которого пользователь будет считаться аутентифицированным и сможет выполнять запросы без повторного обращения к серверам LDAP.
- Чтобы отключить кеширование и заставить обращаться к серверу LDAP для каждого запроса аутентификации, укажите `0` (значение по умолчанию).
- `enable_tls` — флаг, включающий использование защищенного соединения с сервером LDAP.
- Укажите `no` для использования текстового протокола `ldap://` (не рекомендовано).
@ -106,7 +128,7 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
<yandex>
<!- ... -->
<user_directories>
<!- ... -->
<!- Typical LDAP server. -->
<ldap>
<server>my_ldap_server</server>
<roles>
@ -121,6 +143,18 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
<prefix>clickhouse_</prefix>
</role_mapping>
</ldap>
<!- Typical Active Directory with role mapping that relies on the detected user DN. -->
<ldap>
<server>my_ad_server</server>
<role_mapping>
<base_dn>CN=Users,DC=example,DC=com</base_dn>
<attribute>CN</attribute>
<scope>subtree</scope>
<search_filter>(&amp;(objectClass=group)(member={user_dn}))</search_filter>
<prefix>clickhouse_</prefix>
</role_mapping>
</ldap>
</user_directories>
</yandex>
```
@ -135,14 +169,14 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
- `role_mapping` — секция c параметрами LDAP поиска и правилами отображения.
- При аутентификации пользователя, пока еще связанного с LDAP, производится LDAP поиск с помощью `search_filter` и имени этого пользователя. Для каждой записи, найденной в ходе поиска, выделяется значение указанного атрибута. У каждого атрибута, имеющего указанный префикс, этот префикс удаляется, а остальная часть значения становится именем локальной роли, определенной в ClickHouse, причем предполагается, что эта роль была ранее создана запросом [CREATE ROLE](../../sql-reference/statements/create/role.md#create-role-statement) до этого.
- Внутри одной секции `ldap` может быть несколько секций `role_mapping`. Все они будут применены.
- `base_dn` — шаблон, который используется для создания базового DN для LDAP поиска.
- При формировании DN все подстроки `{user_name}` и `{bind_dn}` в шаблоне будут заменяться на фактическое имя пользователя и DN привязки соответственно при каждом LDAP поиске.
- `scope`Область LDAP поиска.
- `base_dn` — шаблон для создания базового DN для LDAP поиска.
- При формировании DN все подстроки `{user_name}`, `{bind_dn}` и `{user_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения и DN пользователя соответственно при каждом LDAP поиске.
- `scope`область LDAP поиска.
- Возможные значения: `base`, `one_level`, `children`, `subtree` (по умолчанию).
- `search_filter` — шаблон, который используется для создания фильтра для каждого LDAP поиска.
- при формировании фильтра все подстроки `{user_name}`, `{bind_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN привязки и базовый DN соответственно при каждом LDAP поиске.
- `search_filter` — шаблон для создания фильтра для каждого LDAP поиска.
- При формировании фильтра все подстроки `{user_name}`, `{bind_dn}`, `{user_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения, DN пользователя и базовый DN соответственно при каждом LDAP поиске.
- Обратите внимание, что специальные символы должны быть правильно экранированы в XML.
- `attribute` — имя атрибута, значение которого будет возвращаться LDAP поиском.
- `attribute` — имя атрибута, значение которого будет возвращаться LDAP поиском. По умолчанию: `cn`.
- `prefix` — префикс, который, как предполагается, будет находиться перед началом каждой строки в исходном списке строк, возвращаемых LDAP поиском. Префикс будет удален из исходных строк, а сами они будут рассматриваться как имена локальных ролей. По умолчанию: пустая строка.
[Оригинальная статья](https://clickhouse.tech/docs/en/operations/external-authenticators/ldap) <!--hide-->

View File

@ -4,7 +4,9 @@ toc_priority: 145
# sumKahan {#agg_function-sumKahan}
Вычисляет сумму с использованием [компенсационного суммирования по алгоритму Кэхэна](https://ru.wikipedia.org/wiki/Алгоритм_Кэхэна)
Вычисляет сумму с использованием [компенсационного суммирования по алгоритму Кэхэна](https://ru.wikipedia.org/wiki/Алгоритм_Кэхэна).
Работает медленнее функции [sum](./sum.md).
Компенсация работает только для [Float](../../../sql-reference/data-types/float.md) типов.
**Синтаксис**

View File

@ -4,7 +4,7 @@ toc_priority: 109
# topKWeighted {#topkweighted}
Аналогична `topK`, но дополнительно принимает положительный целочисленный параметр `weight`. Каждое значение учитывается `weight` раз при расчёте частоты.
Возвращает массив наиболее часто встречающихся значений в указанном столбце. Результирующий массив упорядочен по убыванию частоты значения (не по самим значениям). Дополнительно учитывается вес значения.
**Синтаксис**
@ -15,11 +15,8 @@ topKWeighted(N)(x, weight)
**Аргументы**
- `N` — количество элементов для выдачи.
**Аргументы**
- `x` — значение.
- `weight` — вес. [UInt8](../../../sql-reference/data-types/int-uint.md).
- `weight` — вес. Каждое значение учитывается `weight` раз при расчёте частоты. [UInt64](../../../sql-reference/data-types/int-uint.md).
**Возвращаемое значение**
@ -41,3 +38,6 @@ SELECT topKWeighted(10)(number, number) FROM numbers(1000)
└───────────────────────────────────────────┘
```
**Смотрите также**
- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)

View File

@ -319,13 +319,12 @@ GRANT INSERT(x,y) ON db.table TO john
Разрешает выполнять запросы [DROP](misc.md#drop) и [DETACH](misc.md#detach-statement) в соответствии со следующей иерархией привилегий:
- `DROP`. Уровень:
- `DROP`. Уровень: `GROUP`
- `DROP DATABASE`. Уровень: `DATABASE`
- `DROP TABLE`. Уровень: `TABLE`
- `DROP VIEW`. Уровень: `VIEW`
- `DROP DICTIONARY`. Уровень: `DICTIONARY`
### TRUNCATE {#grant-truncate}
Разрешает выполнять запросы [TRUNCATE](../../sql-reference/statements/truncate.md).

View File

@ -5,9 +5,9 @@ toc_title: jdbc
# jdbc {#jdbc}
`jdbc(jdbc_connection_uri, schema, table)` - возвращает таблицу, соединение с которой происходит через JDBC-драйвер.
`jdbc(datasource, schema, table)` - возвращает таблицу, соединение с которой происходит через JDBC-драйвер.
Для работы этой табличной функции требуется отдельно запускать приложение clickhouse-jdbc-bridge.
Для работы этой табличной функции требуется отдельно запускать приложение [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge).
Данная функция поддерживает Nullable типы (на основании DDL таблицы к которой происходит запрос).
**Пример**
@ -17,10 +17,22 @@ SELECT * FROM jdbc('jdbc:mysql://localhost:3306/?user=root&password=root', 'sche
```
``` sql
SELECT * FROM jdbc('mysql://localhost:3306/?user=root&password=root', 'schema', 'table')
SELECT * FROM jdbc('mysql://localhost:3306/?user=root&password=root', 'select * from schema.table')
```
``` sql
SELECT * FROM jdbc('datasource://mysql-local', 'schema', 'table')
SELECT * FROM jdbc('mysql-dev?p1=233', 'num Int32', 'select toInt32OrZero(''{{p1}}'') as num')
```
``` sql
SELECT *
FROM jdbc('mysql-dev?p1=233', 'num Int32', 'select toInt32OrZero(''{{p1}}'') as num')
```
``` sql
SELECT a.datasource AS server1, b.datasource AS server2, b.name AS db
FROM jdbc('mysql-dev?datasource_column', 'show databases') a
INNER JOIN jdbc('self?datasource_column', 'show databases') b ON a.Database = b.name
```
[Оригинальная статья](https://clickhouse.tech/docs/en/query_language/table_functions/jdbc/) <!--hide-->

View File

@ -122,14 +122,14 @@ FROM s3('https://storage.yandexcloud.net/my-test-bucket-768/big_prefix/file-{000
Запишем данные в файл `test-data.csv.gz`:
``` sql
INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2);
```
Запишем данные из существующей таблицы в файл `test-data.csv.gz`:
``` sql
INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table;
```

View File

@ -1,27 +1,24 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 36
toc_title: HDFS
---
# HDFS {#table_engines-hdfs}
该引擎提供了集成 [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) 生态系统通过允许管理数据 [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html)通过ClickHouse. 这个引擎是相似的
[文件](../special/file.md#table_engines-file) 和 [URL](../special/url.md#table_engines-url) 引擎但提供Hadoop特定功能。
这个引擎提供了与 [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) 生态系统的集成,允许通过 ClickHouse 管理 [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) 上的数据。这个引擎类似于
[文件](../../../engines/table-engines/special/file.md#table_engines-file) 和 [URL](../../../engines/table-engines/special/url.md#table_engines-url) 引擎,但提供Hadoop特定功能。
## 用 {#usage}
## 用 {#usage}
``` sql
ENGINE = HDFS(URI, format)
```
`URI` 参数是HDFS中的整个文件URI。
`format` 参数指定一种可用的文件格式。 执行
`SELECT` 查询时,格式必须支持输入,并执行
`INSERT` queries for output. The available formats are listed in the
[格式](../../../interfaces/formats.md#formats) 科。
路径部分 `URI` 可能包含水珠。 在这种情况下,表将是只读的。
`URI` 参数是 HDFS 中整个文件的 URI。
`format` 参数指定一种可用的文件格式。 执行
`SELECT` 查询时,格式必须支持输入,以及执行
`INSERT` 查询时,格式必须支持输出. 你可以在 [格式](../../../interfaces/formats.md#formats) 章节查看可用的格式。
路径部分 `URI` 可能包含 glob 通配符。 在这种情况下,表将是只读的。
**示例:**
@ -58,20 +55,20 @@ SELECT * FROM hdfs_engine_table LIMIT 2
- 索引。
- 复制。
**路径中的水珠**
**路径中的通配符**
多个路径组件可以具有globs。 对于正在处理的文件应该存在并匹配到整个路径模式。 文件列表确定在 `SELECT` (不在 `CREATE` 时刻)。
多个路径组件可以具有 globs。 对于正在处理的文件应该存在并匹配到整个路径模式。 文件列表的确定是在 `SELECT` 的时候进行(而不是在 `CREATE` 的时候)。
- `*`Substitutes any number of any characters except `/` 包括空字符串。
- `?`Substitutes any single character.
- `{some_string,another_string,yet_another_one}`Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}`Substitutes any number in range from N to M including both borders.
- `*`替代任何数量的任何字符,除了 `/` 以及空字符串。
- `?`代替任何单个字符.
- `{some_string,another_string,yet_another_one}`替代任何字符串 `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}`替换 N 到 M 范围内的任何数字,包括两个边界的值.
建筑与 `{}` 类似于 [远程](../../../sql-reference/table-functions/remote.md) 表功能
`{}` 的结构类似于 [远程](../../../sql-reference/table-functions/remote.md) 表函数
**示例**
1. 假设我们在HDFS上有几个TSV格式的文件其中包含以下Uri:
1. 假设我们在 HDFS 上有几个 TSV 格式的文件,文件的 URI 如下:
- hdfs://hdfs1:9000/some_dir/some_file_1
- hdfs://hdfs1:9000/some_dir/some_file_2
@ -111,10 +108,98 @@ CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs
CREARE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
```
## 配置 {#configuration}
与 GraphiteMergeTree 类似HDFS 引擎支持使用 ClickHouse 配置文件进行扩展配置。有两个配置键可以使用:全局 (`hdfs`) 和用户级别 (`hdfs_*`)。首先全局配置生效,然后用户级别配置生效 (如果用户级别配置存在) 。
``` xml
<!-- HDFS 引擎类型的全局配置选项 -->
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>clickuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<!-- 用户 "root" 的指定配置 -->
<hdfs_root>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_root>
```
### 可选配置选项及其默认值的列表
#### libhdfs3 支持的
| **参数** | **默认值** |
| rpc\_client\_connect\_tcpnodelay | true |
| dfs\_client\_read\_shortcircuit | true |
| output\_replace-datanode-on-failure | true |
| input\_notretry-another-node | false |
| input\_localread\_mappedfile | true |
| dfs\_client\_use\_legacy\_blockreader\_local | false |
| rpc\_client\_ping\_interval | 10 * 1000 |
| rpc\_client\_connect\_timeout | 600 * 1000 |
| rpc\_client\_read\_timeout | 3600 * 1000 |
| rpc\_client\_write\_timeout | 3600 * 1000 |
| rpc\_client\_socekt\_linger\_timeout | -1 |
| rpc\_client\_connect\_retry | 10 |
| rpc\_client\_timeout | 3600 * 1000 |
| dfs\_default\_replica | 3 |
| input\_connect\_timeout | 600 * 1000 |
| input\_read\_timeout | 3600 * 1000 |
| input\_write\_timeout | 3600 * 1000 |
| input\_localread\_default\_buffersize | 1 * 1024 * 1024 |
| dfs\_prefetchsize | 10 |
| input\_read\_getblockinfo\_retry | 3 |
| input\_localread\_blockinfo\_cachesize | 1000 |
| input\_read\_max\_retry | 60 |
| output\_default\_chunksize | 512 |
| output\_default\_packetsize | 64 * 1024 |
| output\_default\_write\_retry | 10 |
| output\_connect\_timeout | 600 * 1000 |
| output\_read\_timeout | 3600 * 1000 |
| output\_write\_timeout | 3600 * 1000 |
| output\_close\_timeout | 3600 * 1000 |
| output\_packetpool\_size | 1024 |
| output\_heeartbeat\_interval | 10 * 1000 |
| dfs\_client\_failover\_max\_attempts | 15 |
| dfs\_client\_read\_shortcircuit\_streams\_cache\_size | 256 |
| dfs\_client\_socketcache\_expiryMsec | 3000 |
| dfs\_client\_socketcache\_capacity | 16 |
| dfs\_default\_blocksize | 64 * 1024 * 1024 |
| dfs\_default\_uri | "hdfs://localhost:9000" |
| hadoop\_security\_authentication | "simple" |
| hadoop\_security\_kerberos\_ticket\_cache\_path | "" |
| dfs\_client\_log\_severity | "INFO" |
| dfs\_domain\_socket\_path | "" |
[HDFS 配置参考](https://hawq.apache.org/docs/userguide/2.3.0.0-incubating/reference/HDFSConfigurationParameterReference.html) 也许会解释一些参数的含义.
#### ClickHouse 额外的配置 {#clickhouse-extras}
| **参数** | **默认值** |
|hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
#### 限制 {#limitations}
* hadoop\_security\_kerberos\_ticket\_cache\_path 只能在全局配置, 不能指定用户
## Kerberos 支持 {#kerberos-support}
如果 hadoop\_security\_authentication 参数的值为 'kerberos' ClickHouse 将通过 Kerberos 设施进行认证。
[这里的](#clickhouse-extras) 参数和 hadoop\_security\_kerberos\_ticket\_cache\_path 也许会有帮助.
注意,由于 libhdfs3 的限制,只支持老式的方法。
数据节点的安全通信无法由 SASL 保证 ( HADOOP\_SECURE\_DN\_USER 是这种安全方法的一个可靠指标)
使用 tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh 脚本作为参考。
如果指定了 hadoop\_kerberos\_keytab, hadoop\_kerberos\_principal 或者 hadoop\_kerberos\_kinit\_command ,将会调用 kinit 工具.在此情况下, hadoop\_kerberos\_keytab 和 hadoop\_kerberos\_principal 参数是必须配置的. kinit 工具和 krb5 配置文件是必要的.
## 虚拟列 {#virtual-columns}
- `_path` — Path to the file.
- `_file` — Name of the file.
- `_path`文件路径.
- `_file`文件名.
**另请参阅**

View File

@ -8,7 +8,7 @@ toc_title: JDBC表引擎
允许CH通过 [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity) 连接到外部数据库。
要实现JDBC连接CH需要使用以后台进程运行的程序 [clickhouse-jdbc-bridge](https://github.com/alex-krash/clickhouse-jdbc-bridge)。
要实现JDBC连接CH需要使用以后台进程运行的程序 [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge)。
该引擎支持 [Nullable](../../../sql-reference/data-types/nullable.md) 数据类型。
@ -20,19 +20,19 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
columns list...
)
ENGINE = JDBC(dbms_uri, external_database, external_table)
ENGINE = JDBC(datasource_uri, external_database, external_table)
```
**引擎参数**
- `dbms_uri` — 外部DBMS的uri.
- `datasource_uri` — 外部DBMS的URI或名字.
格式: `jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>`.
URI格式: `jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>`.
MySQL示例: `jdbc:mysql://localhost:3306/?user=root&password=root`.
- `external_database` — 外部DBMS的数据库名.
- `external_table``external_database`中的外部表名.
- `external_table``external_database`中的外部表名或类似`select * from table1 where column1=1`的查询语句.
## 用法示例 {#usage-example}
@ -85,6 +85,12 @@ FROM jdbc_table
└────────┴──────────────┴───────┴────────────────┘
```
``` sql
INSERT INTO jdbc_table(`int_id`, `float`)
SELECT toInt32(number), toFloat32(number * 1.0)
FROM system.numbers
```
## 参见 {#see-also}
- [JDBC表函数](../../../sql-reference/table-functions/jdbc.md).

View File

@ -5,9 +5,9 @@ toc_title: jdbc
# jdbc {#table-function-jdbc}
`jdbc(jdbc_connection_uri, schema, table)` -返回通过JDBC驱动程序连接的表。
`jdbc(datasource, schema, table)` -返回通过JDBC驱动程序连接的表。
此表函数需要单独的 `clickhouse-jdbc-bridge` 程序才能运行。
此表函数需要单独的 [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) 程序才能运行。
它支持可空类型基于查询的远程表的DDL
**示例**
@ -17,11 +17,22 @@ SELECT * FROM jdbc('jdbc:mysql://localhost:3306/?user=root&password=root', 'sche
```
``` sql
SELECT * FROM jdbc('mysql://localhost:3306/?user=root&password=root', 'schema', 'table')
SELECT * FROM jdbc('mysql://localhost:3306/?user=root&password=root', 'select * from schema.table')
```
``` sql
SELECT * FROM jdbc('datasource://mysql-local', 'schema', 'table')
SELECT * FROM jdbc('mysql-dev?p1=233', 'num Int32', 'select toInt32OrZero(''{{p1}}'') as num')
```
``` sql
SELECT *
FROM jdbc('mysql-dev?p1=233', 'num Int32', 'select toInt32OrZero(''{{p1}}'') as num')
```
``` sql
SELECT a.datasource AS server1, b.datasource AS server2, b.name AS db
FROM jdbc('mysql-dev?datasource_column', 'show databases') a
INNER JOIN jdbc('self?datasource_column', 'show databases') b ON a.Database = b.name
```
[原始文章](https://clickhouse.tech/docs/en/query_language/table_functions/jdbc/) <!--hide-->

View File

@ -1,7 +1,7 @@
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <Common/typeid_cast.h>
#include "registerAggregateFunctions.h"
namespace DB
{

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionBitwise.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionBoundingRatio.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -3,7 +3,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -1,11 +1,9 @@
#include <Common/StringUtils/StringUtils.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -100,7 +100,7 @@ public:
}
/// Reset the state to specified value. This function is not the part of common interface.
void set(AggregateDataPtr __restrict place, UInt64 new_count)
void set(AggregateDataPtr __restrict place, UInt64 new_count) const
{
data(place).count = new_count;
}

View File

@ -2,7 +2,7 @@
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/Helpers.h>
#include <Common/typeid_cast.h>
#include "registerAggregateFunctions.h"
namespace DB
{

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionEntropy.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -17,7 +16,8 @@ namespace ErrorCodes
namespace
{
AggregateFunctionPtr createAggregateFunctionEntropy(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionEntropy(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
if (argument_types.empty())

View File

@ -17,10 +17,10 @@
#include <Common/CurrentThread.h>
#include <Poco/String.h>
#include "registerAggregateFunctions.h"
#include <Functions/FunctionFactory.h>
namespace DB
{
struct Settings;
@ -95,7 +95,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
// nullability themselves. Another special case is functions from Nothing
// that are rewritten to AggregateFunctionNothing, in this case
// nested_function is nullptr.
if (nested_function && nested_function->asWindowFunction())
if (nested_function && nested_function->isOnlyWindowFunction())
{
return nested_function;
}

View File

@ -1,12 +1,10 @@
#include <AggregateFunctions/AggregateFunctionForEach.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <Common/typeid_cast.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -4,7 +4,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -50,7 +49,8 @@ inline AggregateFunctionPtr createAggregateFunctionGroupArrayImpl(const DataType
}
AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionGroupArray(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -17,7 +16,8 @@ namespace ErrorCodes
namespace
{
AggregateFunctionPtr createAggregateFunctionGroupArrayInsertAt(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionGroupArrayInsertAt(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);

View File

@ -5,7 +5,6 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -58,7 +57,8 @@ inline AggregateFunctionPtr createAggregateFunctionMovingImpl(const std::string
}
template <template <typename, typename> class Function>
AggregateFunctionPtr createAggregateFunctionMoving(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionMoving(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);

View File

@ -2,11 +2,11 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include "registerAggregateFunctions.h"
// TODO include this last because of a broken roaring header. See the comment inside.
#include <AggregateFunctions/AggregateFunctionGroupBitmap.h>
namespace DB
{
struct Settings;
@ -18,9 +18,8 @@ namespace ErrorCodes
namespace
{
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data, typename... TArgs>
static IAggregateFunction * createWithIntegerType(const IDataType & argument_type, TArgs && ... args)
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> typename Data, typename... TArgs>
static IAggregateFunction * createWithIntegerType(const IDataType & argument_type, TArgs &&... args)
{
WhichDataType which(argument_type);
if (which.idx == TypeIndex::UInt8) return new AggregateFunctionTemplate<UInt8, Data<UInt8>>(std::forward<TArgs>(args)...);
@ -34,8 +33,9 @@ namespace
return nullptr;
}
template <template <typename> class Data>
AggregateFunctionPtr createAggregateFunctionBitmap(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename> typename Data>
AggregateFunctionPtr createAggregateFunctionBitmap(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
@ -57,12 +57,13 @@ namespace
}
// Additional aggregate functions to manipulate bitmaps.
template <template <typename, typename> class AggregateFunctionTemplate>
AggregateFunctionPtr
createAggregateFunctionBitmapL2(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename, typename> typename AggregateFunctionTemplate>
AggregateFunctionPtr createAggregateFunctionBitmapL2(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
DataTypePtr argument_type_ptr = argument_types[0];
WhichDataType which(*argument_type_ptr);
if (which.idx != TypeIndex::AggregateFunction)
@ -70,11 +71,15 @@ namespace
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
/// groupBitmap needs to know about the data type that was used to create bitmaps.
/// We need to look inside the type of its argument to obtain it.
const DataTypeAggregateFunction & datatype_aggfunc = dynamic_cast<const DataTypeAggregateFunction &>(*argument_type_ptr);
AggregateFunctionPtr aggfunc = datatype_aggfunc.getFunction();
argument_type_ptr = aggfunc->getArgumentTypes()[0];
DataTypePtr nested_argument_type_ptr = aggfunc->getArgumentTypes()[0];
AggregateFunctionPtr res(createWithIntegerType<AggregateFunctionTemplate, AggregateFunctionGroupBitmapData>(
*argument_type_ptr, argument_type_ptr));
*nested_argument_type_ptr, argument_type_ptr));
if (!res)
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,

View File

@ -9,15 +9,19 @@
// TODO include this last because of a broken roaring header. See the comment inside.
#include <AggregateFunctions/AggregateFunctionGroupBitmapData.h>
namespace DB
{
struct Settings;
/// Counts bitmap operation on numbers.
template <typename T, typename Data>
class AggregateFunctionBitmap final : public IAggregateFunctionDataHelper<Data, AggregateFunctionBitmap<T, Data>>
{
public:
AggregateFunctionBitmap(const DataTypePtr & type) : IAggregateFunctionDataHelper<Data, AggregateFunctionBitmap<T, Data>>({type}, {}) { }
AggregateFunctionBitmap(const DataTypePtr & type)
: IAggregateFunctionDataHelper<Data, AggregateFunctionBitmap<T, Data>>({type}, {})
{
}
String getName() const override { return Data::name(); }
@ -46,6 +50,7 @@ public:
};
/// This aggregate function takes the states of AggregateFunctionBitmap as its argument.
template <typename T, typename Data, typename Policy>
class AggregateFunctionBitmapL2 final : public IAggregateFunctionDataHelper<Data, AggregateFunctionBitmapL2<T, Data, Policy>>
{
@ -61,6 +66,11 @@ public:
bool allocatesMemoryInArena() const override { return false; }
DataTypePtr getStateType() const override
{
return this->argument_types.at(0);
}
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
Data & data_lhs = this->data(place);
@ -105,6 +115,7 @@ public:
}
};
template <typename Data>
class BitmapAndPolicy
{

View File

@ -1,9 +1,9 @@
#pragma once
#include <algorithm>
#include <boost/noncopyable.hpp>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <boost/noncopyable.hpp>
#include <Common/HashTable/SmallTable.h>
#include <Common/PODArray.h>
@ -14,9 +14,9 @@
#include <roaring.hh>
#include <roaring64map.hh>
namespace DB
{
struct Settings;
enum BitmapKind
{
@ -24,6 +24,7 @@ enum BitmapKind
Bitmap = 1
};
/**
* For a small number of values - an array of fixed size "on the stack".
* For large, roaring bitmap is allocated.

View File

@ -4,7 +4,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -73,7 +72,8 @@ inline AggregateFunctionPtr createAggregateFunctionGroupUniqArrayImpl(const std:
}
AggregateFunctionPtr createAggregateFunctionGroupUniqArray(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionGroupUniqArray(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);

View File

@ -4,7 +4,7 @@
#include <AggregateFunctions/Helpers.h>
#include <Common/FieldVisitors.h>
#include "registerAggregateFunctions.h"
namespace DB
{

View File

@ -1,12 +1,10 @@
#include <AggregateFunctions/AggregateFunctionIf.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
#include "AggregateFunctionNull.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -11,7 +11,6 @@
#include "AggregateFunctionFactory.h"
#include "FactoryHelpers.h"
#include "Helpers.h"
#include "registerAggregateFunctions.h"
namespace DB
@ -25,13 +24,14 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
using FuncLinearRegression = AggregateFunctionMLMethod<LinearModelData, NameLinearRegression>;
using FuncLogisticRegression = AggregateFunctionMLMethod<LinearModelData, NameLogisticRegression>;
template <class Method>
AggregateFunctionPtr
createAggregateFunctionMLMethod(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <typename Method>
AggregateFunctionPtr createAggregateFunctionMLMethod(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
if (parameters.size() > 4)
throw Exception(

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionMannWhitney.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
#include <AggregateFunctions/Helpers.h>
@ -17,7 +16,8 @@ struct Settings;
namespace
{
AggregateFunctionPtr createAggregateFunctionMannWhitneyUTest(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionMannWhitneyUTest(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/HelpersMinMaxAny.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -11,12 +10,14 @@ struct Settings;
namespace
{
AggregateFunctionPtr createAggregateFunctionMax(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
AggregateFunctionPtr createAggregateFunctionMax(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMaxData>(name, argument_types, parameters, settings));
}
AggregateFunctionPtr createAggregateFunctionArgMax(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
AggregateFunctionPtr createAggregateFunctionArgMax(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionArgMinMax<AggregateFunctionMaxData>(name, argument_types, parameters, settings));
}

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionMaxIntersections.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -1,12 +1,10 @@
#include <AggregateFunctions/AggregateFunctionMerge.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -50,6 +50,11 @@ public:
return nested_func->getReturnType();
}
DataTypePtr getStateType() const override
{
return nested_func->getStateType();
}
void create(AggregateDataPtr __restrict place) const override
{
nested_func->create(place);

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/HelpersMinMaxAny.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -11,12 +10,14 @@ struct Settings;
namespace
{
AggregateFunctionPtr createAggregateFunctionMin(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
AggregateFunctionPtr createAggregateFunctionMin(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMinData>(name, argument_types, parameters, settings));
}
AggregateFunctionPtr createAggregateFunctionArgMin(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
AggregateFunctionPtr createAggregateFunctionArgMin(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionArgMinMax<AggregateFunctionMinData>(name, argument_types, parameters, settings));
}

View File

@ -700,13 +700,11 @@ template <typename Data>
class AggregateFunctionsSingleValue final : public IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>
{
private:
DataTypePtr type;
SerializationPtr serialization;
public:
AggregateFunctionsSingleValue(const DataTypePtr & type_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>({type_}, {})
, type(this->argument_types[0])
AggregateFunctionsSingleValue(const DataTypePtr & type)
: IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>({type}, {})
, serialization(type->getDefaultSerialization())
{
if (StringRef(Data::name()) == StringRef("min")
@ -722,7 +720,7 @@ public:
DataTypePtr getReturnType() const override
{
return type;
return this->argument_types.at(0);
}
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override

View File

@ -4,12 +4,10 @@
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -1,12 +1,11 @@
#include <AggregateFunctions/AggregateFunctionOrFill.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace
{

View File

@ -4,7 +4,7 @@
#include <AggregateFunctions/Helpers.h>
#include <Core/Field.h>
#include "registerAggregateFunctions.h"
namespace DB
{
@ -84,7 +84,8 @@ static constexpr bool supportBigInt()
}
template <template <typename, bool> class Function>
AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
AggregateFunctionPtr createAggregateFunctionQuantile(
const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
{
/// Second argument type check doesn't depend on the type of the first one.
Function<void, true>::assertSecondArg(argument_types);

View File

@ -1,13 +1,12 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionRankCorrelation.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
#include <AggregateFunctions/Helpers.h>
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
namespace DB
@ -17,7 +16,8 @@ struct Settings;
namespace
{
AggregateFunctionPtr createAggregateFunctionRankCorrelation(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionRankCorrelation(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);
assertNoParameters(name, parameters);

View File

@ -1,12 +1,10 @@
#include <AggregateFunctions/AggregateFunctionResample.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionRetention.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -6,7 +6,6 @@
#include <DataTypes/DataTypeDateTime.h>
#include <ext/range.h>
#include "registerAggregateFunctions.h"
namespace DB
{
@ -23,8 +22,9 @@ namespace ErrorCodes
namespace
{
template <template <typename, typename> class AggregateFunction, template <typename> class Data>
AggregateFunctionPtr createAggregateFunctionSequenceBase(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
template <template <typename, typename> typename AggregateFunction, template <typename> typename Data>
AggregateFunctionPtr createAggregateFunctionSequenceBase(
const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
{
if (params.size() != 1)
throw Exception{"Aggregate function " + name + " requires exactly one parameter.",

View File

@ -2,17 +2,10 @@
#include <AggregateFunctions/AggregateFunctionMerge.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
@ -44,36 +37,4 @@ void registerAggregateFunctionCombinatorState(AggregateFunctionCombinatorFactory
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorState>());
}
DataTypePtr AggregateFunctionState::getReturnType() const
{
auto ptr = std::make_shared<DataTypeAggregateFunction>(nested_func, arguments, params);
/// Special case: it is -MergeState combinator.
/// We must return AggregateFunction(agg, ...) instead of AggregateFunction(aggMerge, ...)
if (typeid_cast<const AggregateFunctionMerge *>(ptr->getFunction().get()))
{
if (arguments.size() != 1)
throw Exception("Combinator -MergeState expects only one argument", ErrorCodes::BAD_ARGUMENTS);
if (!typeid_cast<const DataTypeAggregateFunction *>(arguments[0].get()))
throw Exception("Combinator -MergeState expects argument with AggregateFunction type", ErrorCodes::BAD_ARGUMENTS);
return arguments[0];
}
if (!arguments.empty())
{
DataTypePtr argument_type_ptr = arguments[0];
WhichDataType which(*argument_type_ptr);
if (which.idx == TypeIndex::AggregateFunction)
{
if (arguments.size() != 1)
throw Exception("Nested aggregation expects only one argument", ErrorCodes::BAD_ARGUMENTS);
return arguments[0];
}
}
return ptr;
}
}

View File

@ -33,7 +33,15 @@ public:
return nested_func->getName() + "State";
}
DataTypePtr getReturnType() const override;
DataTypePtr getReturnType() const override
{
return getStateType();
}
DataTypePtr getStateType() const override
{
return nested_func->getStateType();
}
void create(AggregateDataPtr __restrict place) const override
{

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/AggregateFunctionStatistics.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -17,8 +16,9 @@ namespace ErrorCodes
namespace
{
template <template <typename> class FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsUnary(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename> typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsUnary(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
@ -31,8 +31,9 @@ AggregateFunctionPtr createAggregateFunctionStatisticsUnary(const std::string &
return res;
}
template <template <typename, typename> class FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsBinary(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename, typename> typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsBinary(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertBinary(name, argument_types);

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/AggregateFunctionStatisticsSimple.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -17,8 +16,9 @@ namespace ErrorCodes
namespace
{
template <template <typename> class FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsUnary(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename> typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsUnary(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
@ -36,8 +36,9 @@ AggregateFunctionPtr createAggregateFunctionStatisticsUnary(const std::string &
return res;
}
template <template <typename, typename> class FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsBinary(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename, typename> typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsBinary(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertBinary(name, argument_types);

View File

@ -3,8 +3,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Moments.h>
#include "registerAggregateFunctions.h"
namespace ErrorCodes
{
@ -57,7 +55,8 @@ struct StudentTTestData : public TTestMoments<Float64>
}
};
AggregateFunctionPtr createAggregateFunctionStudentTTest(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionStudentTTest(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);
assertNoParameters(name, parameters);

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionSum.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -2,7 +2,7 @@
#include <AggregateFunctions/AggregateFunctionSumCount.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
{

View File

@ -4,7 +4,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -11,12 +11,12 @@
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
@ -38,12 +38,6 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
bool use_exact_hash_function = !isAllArgumentsContiguousInMemory(argument_types);
const WhichDataType t(argument_types[0]);
if (t.isAggregateFunction())
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (argument_types.size() == 1)
{
const IDataType & argument_type = *argument_types[0];
@ -86,12 +80,6 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const WhichDataType t(argument_types[0]);
if (t.isAggregateFunction())
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
/// We use exact hash function if the user wants it;
/// or if the arguments are not contiguous in memory, because only exact hash function have support for this case.
bool use_exact_hash_function = is_exact || !isAllArgumentsContiguousInMemory(argument_types);

View File

@ -13,13 +13,13 @@
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
@ -105,11 +105,6 @@ namespace
if (argument_types.empty())
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const WhichDataType t(argument_types[0]);
if (t.isAggregateFunction())
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
switch (precision)
{

View File

@ -10,13 +10,13 @@
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
@ -47,11 +47,6 @@ AggregateFunctionPtr createAggregateFunctionUniqUpTo(const std::string & name, c
if (argument_types.empty())
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const WhichDataType t(argument_types[0]);
if (t.isAggregateFunction())
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
bool use_exact_hash_function = !isAllArgumentsContiguousInMemory(argument_types);

View File

@ -3,8 +3,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Moments.h>
#include "registerAggregateFunctions.h"
namespace ErrorCodes
{
@ -54,7 +52,8 @@ struct WelchTTestData : public TTestMoments<Float64>
}
};
AggregateFunctionPtr createAggregateFunctionWelchTTest(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionWelchTTest(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);
assertNoParameters(name, parameters);

View File

@ -2,6 +2,7 @@ include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(clickhouse_aggregate_functions .)
list(REMOVE_ITEM clickhouse_aggregate_functions_sources
IAggregateFunction.cpp
AggregateFunctionFactory.cpp
AggregateFunctionCombinatorFactory.cpp
AggregateFunctionCount.cpp

View File

@ -0,0 +1,13 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <DataTypes/DataTypeAggregateFunction.h>
namespace DB
{
DataTypePtr IAggregateFunction::getStateType() const
{
return std::make_shared<DataTypeAggregateFunction>(shared_from_this(), argument_types, parameters);
}
}

View File

@ -38,7 +38,7 @@ using AggregateDataPtr = char *;
using ConstAggregateDataPtr = const char *;
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
using AggregateFunctionPtr = std::shared_ptr<const IAggregateFunction>;
struct AggregateFunctionProperties;
/** Aggregate functions interface.
@ -49,7 +49,7 @@ struct AggregateFunctionProperties;
* (which can be created in some memory pool),
* and IAggregateFunction is the external interface for manipulating them.
*/
class IAggregateFunction
class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunction>
{
public:
IAggregateFunction(const DataTypes & argument_types_, const Array & parameters_)
@ -61,6 +61,9 @@ public:
/// Get the result type.
virtual DataTypePtr getReturnType() const = 0;
/// Get the data type of internal state. By default it is AggregateFunction(name(params), argument_types...).
virtual DataTypePtr getStateType() const;
/// Get type which will be used for prediction result in case if function is an ML method.
virtual DataTypePtr getReturnTypeToPredict() const
{
@ -236,9 +239,7 @@ public:
// aggregate functions implement IWindowFunction interface and so on. This
// would be more logically correct, but more complex. We only have a handful
// of true window functions, so this hack-ish interface suffices.
virtual IWindowFunction * asWindowFunction() { return nullptr; }
virtual const IWindowFunction * asWindowFunction() const
{ return const_cast<IAggregateFunction *>(this)->asWindowFunction(); }
virtual bool isOnlyWindowFunction() const { return false; }
protected:
DataTypes argument_types;

View File

@ -61,6 +61,7 @@ SRCS(
AggregateFunctionUniqUpTo.cpp
AggregateFunctionWelchTTest.cpp
AggregateFunctionWindowFunnel.cpp
IAggregateFunction.cpp
UniqCombinedBiasData.cpp
UniqVariadicHash.cpp
parseAggregateFunctionParameters.cpp

View File

@ -111,6 +111,7 @@ list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp
list (APPEND dbms_headers Functions/IFunction.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/replicate.h Functions/FunctionsLogical.h)
list (APPEND dbms_sources
AggregateFunctions/IAggregateFunction.cpp
AggregateFunctions/AggregateFunctionFactory.cpp
AggregateFunctions/AggregateFunctionCombinatorFactory.cpp
AggregateFunctions/AggregateFunctionState.cpp

View File

@ -167,7 +167,7 @@ MutableColumnPtr ColumnAggregateFunction::predictValues(const ColumnsWithTypeAnd
MutableColumnPtr res = func->getReturnTypeToPredict()->createColumn();
res->reserve(data.size());
auto * machine_learning_function = func.get();
const auto * machine_learning_function = func.get();
if (machine_learning_function)
{
if (data.size() == 1)
@ -485,7 +485,7 @@ Arena & ColumnAggregateFunction::createOrGetArena()
}
static void pushBackAndCreateState(ColumnAggregateFunction::Container & data, Arena & arena, IAggregateFunction * func)
static void pushBackAndCreateState(ColumnAggregateFunction::Container & data, Arena & arena, const IAggregateFunction * func)
{
data.push_back(arena.alignedAlloc(func->sizeOfData(), func->alignOfData()));
try

View File

@ -83,10 +83,12 @@
#define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
#define DBMS_TCP_PROTOCOL_VERSION 54448
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
#define DBMS_TCP_PROTOCOL_VERSION 54449
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -100,7 +100,7 @@ bool DataTypeAggregateFunction::equals(const IDataType & rhs) const
SerializationPtr DataTypeAggregateFunction::doGetDefaultSerialization() const
{
return std::make_shared<SerializationAggregateFunction>(function);
return std::make_shared<SerializationAggregateFunction>(function, getName());
}

View File

@ -20,19 +20,16 @@ namespace DB
void SerializationAggregateFunction::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
const String & s = get<const String &>(field);
writeVarUInt(s.size(), ostr);
writeString(s, ostr);
const AggregateFunctionStateData & state = get<const AggregateFunctionStateData &>(field);
writeBinary(state.data, ostr);
}
void SerializationAggregateFunction::deserializeBinary(Field & field, ReadBuffer & istr) const
{
UInt64 size;
readVarUInt(size, istr);
field = String();
String & s = get<String &>(field);
s.resize(size);
istr.readStrict(s.data(), size);
field = AggregateFunctionStateData();
AggregateFunctionStateData & s = get<AggregateFunctionStateData &>(field);
readBinary(s.data, istr);
s.name = type_name;
}
void SerializationAggregateFunction::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const

View File

@ -12,11 +12,13 @@ class SerializationAggregateFunction final : public ISerialization
{
private:
AggregateFunctionPtr function;
String type_name;
public:
static constexpr bool is_parametric = true;
SerializationAggregateFunction(const AggregateFunctionPtr & function_): function(function_) {}
SerializationAggregateFunction(const AggregateFunctionPtr & function_, String type_name_)
: function(function_), type_name(std::move(type_name_)) {}
/// NOTE These two functions for serializing single values are incompatible with the functions below.
void serializeBinary(const Field & field, WriteBuffer & ostr) const override;

View File

@ -55,7 +55,7 @@ public:
cells.resize_fill(cells_size);
size_overlap_mask = cells_size - 1;
setup(dictionary_structure);
createAttributes(dictionary_structure);
}
bool returnsFetchedColumnsInOrderOfRequestedKeys() const override { return true; }
@ -226,23 +226,17 @@ private:
auto & attribute = attributes[attribute_index];
const auto & default_value_provider = fetch_request.defaultValueProviderAtIndex(attribute_index);
size_t fetched_keys_size = fetched_keys.size();
auto & fetched_column = *result.fetched_columns[attribute_index];
fetched_column.reserve(fetched_keys_size);
fetched_column.reserve(fetched_columns_index);
if (unlikely(attribute.is_complex_type))
if (unlikely(attribute.is_nullable))
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
fetched_column.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
fetched_column.insert(container[fetched_key.element_index]);
}
getItemsForFetchedKeys<Field>(
attribute,
fetched_columns_index,
fetched_keys,
[&](Field & value) { fetched_column.insert(value); },
default_value_provider);
}
else
{
@ -250,46 +244,40 @@ private:
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
using ColumnType = typename ColumnProvider::ColumnType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnType =
std::conditional_t<std::is_same_v<AttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<AttributeType>, ColumnDecimal<ValueType>,
ColumnVector<AttributeType>>>;
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
ColumnType & column_typed = static_cast<ColumnType &>(fetched_column);
if constexpr (std::is_same_v<ColumnType, ColumnString>)
if constexpr (std::is_same_v<ValueType, Array>)
{
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
column_typed.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
{
auto item = container[fetched_key.element_index];
column_typed.insertData(item.data, item.size);
}
getItemsForFetchedKeys<ValueType>(
attribute,
fetched_columns_index,
fetched_keys,
[&](Array & value) { fetched_column.insert(value); },
default_value_provider);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
getItemsForFetchedKeys<ValueType>(
attribute,
fetched_columns_index,
fetched_keys,
[&](StringRef value) { fetched_column.insertData(value.data, value.size); },
default_value_provider);
}
else
{
auto & data = column_typed.getData();
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
column_typed.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
{
auto item = container[fetched_key.element_index];
data.push_back(item);
}
}
getItemsForFetchedKeys<ValueType>(
attribute,
fetched_columns_index,
fetched_keys,
[&](auto value) { data.push_back(value); },
default_value_provider);
}
};
@ -339,7 +327,9 @@ private:
column->get(key_index, column_value);
if constexpr (std::is_same_v<ElementType, Field>)
{
container.back() = column_value;
}
else if constexpr (std::is_same_v<ElementType, StringRef>)
{
const String & string_value = column_value.get<String>();
@ -348,7 +338,9 @@ private:
container.back() = inserted_value;
}
else
{
container.back() = column_value.get<NearestFieldType<ElementType>>();
}
});
}
@ -382,7 +374,9 @@ private:
column->get(key_index, column_value);
if constexpr (std::is_same_v<ElementType, Field>)
{
container[index_to_use] = column_value;
}
else if constexpr (std::is_same_v<ElementType, StringRef>)
{
const String & string_value = column_value.get<String>();
@ -398,7 +392,9 @@ private:
container[index_to_use] = inserted_value;
}
else
{
container[index_to_use] = column_value.get<NearestFieldType<ElementType>>();
}
});
}
}
@ -504,9 +500,9 @@ private:
auto & attribute = attributes[attribute_index];
auto & attribute_type = attribute.type;
if (unlikely(attribute.is_complex_type))
if (unlikely(attribute.is_nullable))
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
auto & container = std::get<ContainerType<Field>>(attribute.attribute_container);
std::forward<GetContainerFunc>(func)(container);
}
else
@ -517,7 +513,7 @@ private:
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
auto & container = std::get<ContainerType<ValueType>>(attribute.attribute_container);
std::forward<GetContainerFunc>(func)(container);
};
@ -541,7 +537,82 @@ private:
return updated_value;
}
void setup(const DictionaryStructure & dictionary_structure)
template<typename ValueType>
using ContainerType = std::conditional_t<
std::is_same_v<ValueType, Field> || std::is_same_v<ValueType, Array>,
std::vector<ValueType>,
PaddedPODArray<ValueType>>;
struct Attribute
{
AttributeUnderlyingType type;
bool is_nullable;
std::variant<
ContainerType<UInt8>,
ContainerType<UInt16>,
ContainerType<UInt32>,
ContainerType<UInt64>,
ContainerType<UInt128>,
ContainerType<UInt256>,
ContainerType<Int8>,
ContainerType<Int16>,
ContainerType<Int32>,
ContainerType<Int64>,
ContainerType<Int128>,
ContainerType<Int256>,
ContainerType<Decimal32>,
ContainerType<Decimal64>,
ContainerType<Decimal128>,
ContainerType<Decimal256>,
ContainerType<Float32>,
ContainerType<Float64>,
ContainerType<UUID>,
ContainerType<StringRef>,
ContainerType<Array>,
ContainerType<Field>> attribute_container;
};
template <typename ValueType, typename ValueSetter>
void getItemsForFetchedKeys(
Attribute & attribute,
size_t fetched_keys_size,
PaddedPODArray<FetchedKey> & fetched_keys,
ValueSetter && value_setter,
const DefaultValueProvider & default_value_provider)
{
auto & container = std::get<ContainerType<ValueType>>(attribute.attribute_container);
for (size_t fetched_key_index = 0; fetched_key_index < fetched_keys_size; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
{
auto default_value = default_value_provider.getDefaultValue(fetched_key_index);
if constexpr (std::is_same_v<ValueType, Field>)
{
value_setter(default_value);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto & value = default_value.get<String>();
value_setter(value);
}
else
{
value_setter(default_value.get<ValueType>());
}
}
else
{
value_setter(container[fetched_key.element_index]);
}
}
}
void createAttributes(const DictionaryStructure & dictionary_structure)
{
/// For each dictionary attribute create storage attribute
/// For simple attributes create PODArray, for complex vector of Fields
@ -561,12 +632,12 @@ private:
attributes.emplace_back();
auto & last_attribute = attributes.back();
last_attribute.type = attribute_type;
last_attribute.is_complex_type = dictionary_attribute.is_nullable || dictionary_attribute.is_array;
last_attribute.is_nullable = dictionary_attribute.is_nullable;
if (dictionary_attribute.is_nullable)
last_attribute.attribute_container = std::vector<Field>();
last_attribute.attribute_container = ContainerType<Field>();
else
last_attribute.attribute_container = PaddedPODArray<ValueType>();
last_attribute.attribute_container = ContainerType<ValueType>();
};
callOnDictionaryAttributeType(attribute_type, type_call);
@ -583,35 +654,6 @@ private:
time_t deadline;
};
struct Attribute
{
AttributeUnderlyingType type;
bool is_complex_type;
std::variant<
PaddedPODArray<UInt8>,
PaddedPODArray<UInt16>,
PaddedPODArray<UInt32>,
PaddedPODArray<UInt64>,
PaddedPODArray<UInt128>,
PaddedPODArray<UInt256>,
PaddedPODArray<Int8>,
PaddedPODArray<Int16>,
PaddedPODArray<Int32>,
PaddedPODArray<Int64>,
PaddedPODArray<Int128>,
PaddedPODArray<Int256>,
PaddedPODArray<Decimal32>,
PaddedPODArray<Decimal64>,
PaddedPODArray<Decimal128>,
PaddedPODArray<Decimal256>,
PaddedPODArray<Float32>,
PaddedPODArray<Float64>,
PaddedPODArray<UUID>,
PaddedPODArray<StringRef>,
std::vector<Field>> attribute_container;
};
CacheDictionaryStorageConfiguration configuration;
pcg64 rnd_engine;

View File

@ -93,7 +93,7 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - 1;
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - configuration.update_lag;
std::string str_time = DateLUT::instance().timeToString(hr_time);
update_time = std::chrono::system_clock::now();
return query_builder.composeUpdateQuery(configuration.update_field, str_time);
@ -222,7 +222,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
ClickHouseDictionarySource::Configuration configuration {
ClickHouseDictionarySource::Configuration configuration
{
.secure = config.getBool(settings_config_prefix + ".secure", false),
.host = host,
.port = port,
@ -231,8 +232,9 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table"),
.where = config.getString(settings_config_prefix + ".where", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.is_local = isLocalAddress({host, port}, default_port)
};

View File

@ -28,8 +28,9 @@ public:
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
const std::string invalidate_query;
const std::string update_field;
const UInt64 update_lag;
const bool is_local;
};

View File

@ -6,8 +6,11 @@
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Core/Block.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryStructure.h>
@ -231,14 +234,27 @@ class DictionaryAttributeColumnProvider
{
public:
using ColumnType =
std::conditional_t<std::is_same_v<DictionaryAttributeType, Array>, ColumnArray,
std::conditional_t<std::is_same_v<DictionaryAttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<DictionaryAttributeType>, ColumnDecimal<DictionaryAttributeType>,
ColumnVector<DictionaryAttributeType>>>;
ColumnVector<DictionaryAttributeType>>>>;
using ColumnPtr = typename ColumnType::MutablePtr;
static ColumnPtr getColumn(const DictionaryAttribute & dictionary_attribute, size_t size)
{
if constexpr (std::is_same_v<DictionaryAttributeType, Array>)
{
if (const auto * array_type = typeid_cast<const DataTypeArray *>(dictionary_attribute.type.get()))
{
auto nested_column = array_type->getNestedType()->createColumn();
return ColumnArray::create(std::move(nested_column));
}
else
{
throw Exception(ErrorCodes::TYPE_MISMATCH, "Unsupported attribute type.");
}
}
if constexpr (std::is_same_v<DictionaryAttributeType, String>)
{
return ColumnType::create();
@ -249,7 +265,8 @@ public:
}
else if constexpr (IsDecimalNumber<DictionaryAttributeType>)
{
auto scale = getDecimalScale(*dictionary_attribute.nested_type);
auto nested_type = removeNullable(dictionary_attribute.type);
auto scale = getDecimalScale(*nested_type);
return ColumnType::create(size, scale);
}
else if constexpr (is_arithmetic_v<DictionaryAttributeType>)
@ -280,18 +297,18 @@ public:
: default_value(std::move(attribute_default_value))
{
if (default_values_column_ == nullptr)
use_default_value_from_column = false;
use_attribute_default_value = true;
else
{
if (const auto * const default_col = checkAndGetColumn<DefaultColumnType>(*default_values_column_))
{
default_values_column = default_col;
use_default_value_from_column = true;
use_attribute_default_value = false;
}
else if (const auto * const default_col_const = checkAndGetColumnConst<DefaultColumnType>(default_values_column_.get()))
{
default_value = default_col_const->template getValue<DictionaryAttributeType>();
use_default_value_from_column = false;
use_attribute_default_value = true;
}
else
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type of default column is not the same as dictionary attribute type.");
@ -300,12 +317,17 @@ public:
DefaultValueType operator[](size_t row)
{
if (!use_default_value_from_column)
if (use_attribute_default_value)
return static_cast<DefaultValueType>(default_value);
assert(default_values_column != nullptr);
if constexpr (std::is_same_v<DefaultColumnType, ColumnString>)
if constexpr (std::is_same_v<DefaultColumnType, ColumnArray>)
{
Field field = (*default_values_column)[row];
return field.get<Array>();
}
else if constexpr (std::is_same_v<DefaultColumnType, ColumnString>)
return default_values_column->getDataAt(row);
else
return default_values_column->getData()[row];
@ -313,7 +335,7 @@ public:
private:
DictionaryAttributeType default_value;
const DefaultColumnType * default_values_column = nullptr;
bool use_default_value_from_column = false;
bool use_attribute_default_value = false;
};
template <DictionaryKeyType key_type>

View File

@ -25,9 +25,10 @@ namespace
Block block;
if (dict_struct.id)
{
block.insert(ColumnWithTypeAndName{ColumnUInt64::create(1, 0), std::make_shared<DataTypeUInt64>(), dict_struct.id->name});
if (dict_struct.key)
}
else if (dict_struct.key)
{
for (const auto & attribute : *dict_struct.key)
{

View File

@ -79,9 +79,7 @@ AttributeUnderlyingType getAttributeUnderlyingType(const DataTypePtr & type)
case TypeIndex::String: return AttributeUnderlyingType::String;
// Temporary hack to allow arrays in keys, since they are never retrieved for polygon dictionaries.
// TODO: This should be fixed by fully supporting arrays in dictionaries.
case TypeIndex::Array: return AttributeUnderlyingType::String;
case TypeIndex::Array: return AttributeUnderlyingType::Array;
default: break;
}
@ -125,7 +123,7 @@ DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration
id.emplace(config, structure_prefix + ".id");
else if (has_key)
{
key.emplace(getAttributes(config, structure_prefix + ".key", true));
key.emplace(getAttributes(config, structure_prefix + ".key", /*complex_key_attributes =*/ true));
if (key->empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty 'key' supplied");
}
@ -173,7 +171,7 @@ DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration
has_expressions = true;
}
attributes = getAttributes(config, structure_prefix, false);
attributes = getAttributes(config, structure_prefix, /*complex_key_attributes =*/ false);
for (size_t i = 0; i < attributes.size(); ++i)
{
@ -375,17 +373,10 @@ std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
const auto type_string = config.getString(prefix + "type");
const auto initial_type = DataTypeFactory::instance().get(type_string);
auto type = initial_type;
bool is_array = false;
bool is_nullable = false;
bool is_nullable = initial_type->isNullable();
if (type->isNullable())
{
is_nullable = true;
type = removeNullable(type);
}
const auto underlying_type = getAttributeUnderlyingType(type);
auto non_nullable_type = removeNullable(initial_type);
const auto underlying_type = getAttributeUnderlyingType(non_nullable_type);
const auto expression = config.getString(prefix + "expression", "");
if (!expression.empty())
@ -394,26 +385,27 @@ std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
Field null_value;
if (allow_null_values)
{
/// TODO: Fix serialization for nullable type.
const auto null_value_string = config.getString(prefix + "null_value");
try
{
if (null_value_string.empty())
{
null_value = type->getDefault();
null_value = non_nullable_type->getDefault();
}
else
{
ReadBufferFromString null_value_buffer{null_value_string};
auto column_with_null_value = type->createColumn();
type->getDefaultSerialization()->deserializeTextEscaped(*column_with_null_value, null_value_buffer, format_settings);
auto column_with_null_value = non_nullable_type->createColumn();
non_nullable_type->getDefaultSerialization()->deserializeTextEscaped(*column_with_null_value, null_value_buffer, format_settings);
null_value = (*column_with_null_value)[0];
}
}
catch (Exception & e)
{
String dictionary_name = config.getString(".dictionary.name", "");
e.addMessage("While parsing null_value for attribute with name " + name
+ " in dictionary " + dictionary_name);
e.addMessage(fmt::format("While parsing null_value for attribute with name {} in dictionary {}", name, dictionary_name));
throw;
}
}
@ -436,15 +428,12 @@ std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
name,
underlying_type,
initial_type,
initial_type->getDefaultSerialization(),
type,
expression,
null_value,
hierarchical,
injective,
is_object_id,
is_nullable,
is_array});
is_nullable});
}
return res_attributes;

View File

@ -39,6 +39,7 @@
M(Decimal256) \
M(UUID) \
M(String) \
M(Array) \
namespace DB
@ -74,15 +75,12 @@ struct DictionaryAttribute final
const std::string name;
const AttributeUnderlyingType underlying_type;
const DataTypePtr type;
const SerializationPtr serialization;
const DataTypePtr nested_type;
const std::string expression;
const Field null_value;
const bool hierarchical;
const bool injective;
const bool is_object_id;
const bool is_nullable;
const bool is_array;
};
template <typename Type>
@ -92,7 +90,7 @@ struct DictionaryAttributeType
};
template <typename F>
void callOnDictionaryAttributeType(AttributeUnderlyingType type, F&& func)
void callOnDictionaryAttributeType(AttributeUnderlyingType type, F && func)
{
switch (type)
{

View File

@ -60,16 +60,12 @@ namespace
ExecutableDictionarySource::ExecutableDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Configuration & configuration_,
Block & sample_block_,
ContextConstPtr context_)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, dict_struct{dict_struct_}
, implicit_key{config.getBool(config_prefix + ".implicit_key", false)}
, command{config.getString(config_prefix + ".command")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, format{config.getString(config_prefix + ".format")}
, dict_struct(dict_struct_)
, configuration(configuration_)
, sample_block{sample_block_}
, context(context_)
{
@ -77,7 +73,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(
/// these columns will not be returned from source
/// Implicit key means that the source script will return only values,
/// and the correspondence to the requested keys is determined implicitly - by the order of rows in the result.
if (implicit_key)
if (configuration.implicit_key)
{
auto keys_names = dict_struct.getKeysNames();
@ -91,43 +87,40 @@ ExecutableDictionarySource::ExecutableDictionarySource(
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, update_time{other.update_time}
, dict_struct{other.dict_struct}
, implicit_key{other.implicit_key}
, command{other.command}
, update_field{other.update_field}
, format{other.format}
, sample_block{other.sample_block}
, update_time(other.update_time)
, dict_struct(other.dict_struct)
, configuration(other.configuration)
, sample_block(other.sample_block)
, context(Context::createCopy(other.context))
{
}
BlockInputStreamPtr ExecutableDictionarySource::loadAll()
{
if (implicit_key)
if (configuration.implicit_key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadAll method");
LOG_TRACE(log, "loadAll {}", toString());
auto process = ShellCommand::execute(command);
auto input_stream = context->getInputFormat(format, process->out, sample_block, max_block_size);
auto process = ShellCommand::execute(configuration.command);
auto input_stream = context->getInputFormat(configuration.format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
}
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
{
if (implicit_key)
if (configuration.implicit_key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method");
time_t new_update_time = time(nullptr);
SCOPE_EXIT(update_time = new_update_time);
std::string command_with_update_field = command;
std::string command_with_update_field = configuration.command;
if (update_time)
command_with_update_field += " " + update_field + " " + DB::toString(LocalDateTime(update_time - 1));
command_with_update_field += " " + configuration.update_field + " " + DB::toString(LocalDateTime(update_time - configuration.update_lag));
LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field);
auto process = ShellCommand::execute(command_with_update_field);
auto input_stream = context->getInputFormat(format, process->out, sample_block, max_block_size);
auto input_stream = context->getInputFormat(configuration.format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
}
@ -220,15 +213,15 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_col
BlockInputStreamPtr ExecutableDictionarySource::getStreamForBlock(const Block & block)
{
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
context, format, sample_block, command, log,
context, configuration.format, sample_block, configuration.command, log,
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context->getOutputStream(format, out, block.cloneEmpty());
auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block);
out.close();
});
if (implicit_key)
if (configuration.implicit_key)
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
else
return std::shared_ptr<BlockInputStreamWithBackgroundThread>(stream.release());
@ -246,7 +239,7 @@ bool ExecutableDictionarySource::supportsSelectiveLoad() const
bool ExecutableDictionarySource::hasUpdateField() const
{
return !update_field.empty();
return !configuration.update_field.empty();
}
DictionarySourcePtr ExecutableDictionarySource::clone() const
@ -256,7 +249,7 @@ DictionarySourcePtr ExecutableDictionarySource::clone() const
std::string ExecutableDictionarySource::toString() const
{
return "Executable: " + command;
return "Executable: " + configuration.command;
}
void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
@ -280,10 +273,20 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
return std::make_unique<ExecutableDictionarySource>(
dict_struct, config, config_prefix + ".executable",
sample_block, context_local_copy);
std::string settings_config_prefix = config_prefix + ".executable";
ExecutableDictionarySource::Configuration configuration
{
.implicit_key = config.getBool(settings_config_prefix + ".implicit_key", false),
.command = config.getString(settings_config_prefix + ".command"),
.format = config.getString(settings_config_prefix + ".format"),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
};
return std::make_unique<ExecutableDictionarySource>(dict_struct, configuration, sample_block, context_local_copy);
};
factory.registerSource("executable", create_table_source);
}

View File

@ -15,10 +15,19 @@ namespace DB
class ExecutableDictionarySource final : public IDictionarySource
{
public:
struct Configuration
{
bool implicit_key;
const std::string command;
const std::string format;
const std::string update_field;
const UInt64 update_lag;
};
ExecutableDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Configuration & configuration_,
Block & sample_block_,
ContextConstPtr context_);
@ -53,10 +62,7 @@ private:
Poco::Logger * log;
time_t update_time = 0;
const DictionaryStructure dict_struct;
bool implicit_key;
const std::string command;
const std::string update_field;
const std::string format;
const Configuration configuration;
Block sample_block;
ContextConstPtr context;
};

View File

@ -70,12 +70,12 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutableP
BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll()
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method");
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadAll method");
}
BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll()
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method");
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadUpdatedAll method");
}
namespace
@ -254,7 +254,7 @@ bool ExecutablePoolDictionarySource::supportsSelectiveLoad() const
bool ExecutablePoolDictionarySource::hasUpdateField() const
{
return !configuration.update_field.empty();
return false;
}
DictionarySourcePtr ExecutablePoolDictionarySource::clone() const
@ -295,9 +295,9 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
settings_no_parallel_parsing.input_format_parallel_parsing = false;
context_local_copy->setSettings(settings_no_parallel_parsing);
String configuration_config_prefix = config_prefix + ".executable_pool";
String settings_config_prefix = config_prefix + ".executable_pool";
size_t max_command_execution_time = config.getUInt64(configuration_config_prefix + ".max_command_execution_time", 10);
size_t max_command_execution_time = config.getUInt64(settings_config_prefix + ".max_command_execution_time", 10);
size_t max_execution_time_seconds = static_cast<size_t>(context->getSettings().max_execution_time.totalSeconds());
if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
@ -305,12 +305,11 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
ExecutablePoolDictionarySource::Configuration configuration
{
.command = config.getString(configuration_config_prefix + ".command"),
.format = config.getString(configuration_config_prefix + ".format"),
.pool_size = config.getUInt64(configuration_config_prefix + ".size"),
.update_field = config.getString(configuration_config_prefix + ".update_field", ""),
.implicit_key = config.getBool(configuration_config_prefix + ".implicit_key", false),
.command_termination_timeout = config.getUInt64(configuration_config_prefix + ".command_termination_timeout", 10),
.command = config.getString(settings_config_prefix + ".command"),
.format = config.getString(settings_config_prefix + ".format"),
.pool_size = config.getUInt64(settings_config_prefix + ".size"),
.implicit_key = config.getBool(settings_config_prefix + ".implicit_key", false),
.command_termination_timeout = config.getUInt64(settings_config_prefix + ".command_termination_timeout", 10),
.max_command_execution_time = max_command_execution_time
};

View File

@ -32,7 +32,6 @@ public:
const String command;
const String format;
const size_t pool_size;
const String update_field;
const bool implicit_key;
const size_t command_termination_timeout;
const size_t max_command_execution_time;

View File

@ -358,7 +358,8 @@ void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, cons
/// key_i=value_i
writeQuoted(key_description.name, out);
writeString("=", out);
key_description.serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
auto serialization = key_description.type->getDefaultSerialization();
serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
}
}
@ -415,7 +416,8 @@ void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const si
writeString(", ", out);
first = false;
(*dict_struct.key)[i].serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
auto serialization = (*dict_struct.key)[i].type->getDefaultSerialization();
serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
}
writeString(")", out);

View File

@ -70,17 +70,27 @@ ColumnPtr FlatDictionary::getColumn(
using ValueType = DictionaryValueType<AttributeType>;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
const auto attribute_null_value = std::get<ValueType>(attribute.null_values);
const auto & attribute_null_value = std::get<ValueType>(attribute.null_values);
AttributeType null_value = static_cast<AttributeType>(attribute_null_value);
DictionaryDefaultValueExtractor<AttributeType> default_value_extractor(std::move(null_value), default_values_column);
auto column = ColumnProvider::getColumn(dictionary_attribute, size);
if constexpr (std::is_same_v<ValueType, StringRef>)
if constexpr (std::is_same_v<ValueType, Array>)
{
auto * out = column.get();
getItemsImpl<ValueType, ValueType>(
getItemsImpl<ValueType>(
attribute,
ids,
[&](const size_t, const Array & value) { out->insert(value); },
default_value_extractor);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto * out = column.get();
getItemsImpl<ValueType>(
attribute,
ids,
[&](const size_t, const StringRef value) { out->insertData(value.data, value.size); },
@ -90,7 +100,7 @@ ColumnPtr FlatDictionary::getColumn(
{
auto & out = column->getData();
getItemsImpl<ValueType, ValueType>(
getItemsImpl<ValueType>(
attribute,
ids,
[&](const size_t row, const auto value) { out[row] = value; },
@ -275,6 +285,7 @@ void FlatDictionary::blockToAttributes(const Block & block)
if (already_processed_keys.find(key) != nullptr)
continue;
already_processed_keys.insert(key);
setAttributeValue(attribute, key, attribute_column[i]);
@ -352,7 +363,18 @@ void FlatDictionary::calculateBytesAllocated()
using ValueType = DictionaryValueType<AttributeType>;
const auto & container = std::get<ContainerType<ValueType>>(attribute.container);
bytes_allocated += sizeof(PaddedPODArray<ValueType>) + container.allocated_bytes();
bytes_allocated += sizeof(ContainerType<ValueType>);
if constexpr (std::is_same_v<ValueType, Array>)
{
/// It is not accurate calculations
bytes_allocated += sizeof(Array) * container.size();
}
else
{
bytes_allocated += container.allocated_bytes();
}
bucket_count = container.capacity();
if constexpr (std::is_same_v<ValueType, StringRef>)
@ -396,7 +418,7 @@ FlatDictionary::Attribute FlatDictionary::createAttribute(const DictionaryAttrib
return attribute;
}
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultValueExtractor>
template <typename AttributeType, typename ValueSetter, typename DefaultValueExtractor>
void FlatDictionary::getItemsImpl(
const Attribute & attribute,
const PaddedPODArray<UInt64> & keys,
@ -414,7 +436,7 @@ void FlatDictionary::getItemsImpl(
if (key < loaded_keys.size() && loaded_keys[key])
{
set_value(row, static_cast<OutputType>(container[key]));
set_value(row, container[key]);
++keys_found;
}
else
@ -440,6 +462,10 @@ void FlatDictionary::resize(Attribute & attribute, UInt64 key)
{
const size_t elements_count = key + 1; //id=0 -> elements_count=1
loaded_keys.resize(elements_count, false);
if constexpr (std::is_same_v<T, Array>)
container.resize(elements_count, std::get<T>(attribute.null_values));
else
container.resize_fill(elements_count, std::get<T>(attribute.null_values));
}
}
@ -461,13 +487,13 @@ void FlatDictionary::setAttributeValueImpl<String>(Attribute & attribute, UInt64
void FlatDictionary::setAttributeValue(Attribute & attribute, const UInt64 key, const Field & value)
{
auto type_call = [&](const auto &dictionary_attribute_type)
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ResizeType = std::conditional_t<std::is_same_v<AttributeType, String>, StringRef, AttributeType>;
using ValueType = DictionaryValueType<AttributeType>;
resize<ResizeType>(attribute, key);
resize<ValueType>(attribute, key);
if (attribute.nullable_set)
{

View File

@ -106,7 +106,7 @@ public:
private:
template <typename Value>
using ContainerType = PaddedPODArray<Value>;
using ContainerType = std::conditional_t<std::is_same_v<Value, Array>, std::vector<Value>, PaddedPODArray<Value>>;
using NullableSet = HashSet<UInt64, DefaultHash<UInt64>>;
@ -135,8 +135,10 @@ private:
Float32,
Float64,
UUID,
StringRef>
StringRef,
Array>
null_values;
std::variant<
ContainerType<UInt8>,
ContainerType<UInt16>,
@ -157,7 +159,8 @@ private:
ContainerType<Float32>,
ContainerType<Float64>,
ContainerType<UUID>,
ContainerType<StringRef>>
ContainerType<StringRef>,
ContainerType<Array>>
container;
std::unique_ptr<Arena> string_arena;
@ -172,7 +175,7 @@ private:
Attribute createAttribute(const DictionaryAttribute& attribute, const Field & null_value);
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultValueExtractor>
template <typename AttributeType, typename ValueSetter, typename DefaultValueExtractor>
void getItemsImpl(
const Attribute & attribute,
const PaddedPODArray<UInt64> & keys,

View File

@ -28,58 +28,32 @@ static const UInt64 max_block_size = 8192;
HTTPDictionarySource::HTTPDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Configuration & configuration_,
const Poco::Net::HTTPBasicCredentials & credentials_,
Block & sample_block_,
ContextConstPtr context_,
bool created_from_ddl)
: log(&Poco::Logger::get("HTTPDictionarySource"))
, update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, url{config.getString(config_prefix + ".url", "")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, format{config.getString(config_prefix + ".format")}
, sample_block{sample_block_}
, update_time(std::chrono::system_clock::from_time_t(0))
, dict_struct(dict_struct_)
, configuration(configuration_)
, sample_block(sample_block_)
, context(context_)
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
{
if (created_from_ddl)
context->getRemoteHostFilter().checkURL(Poco::URI(url));
context->getRemoteHostFilter().checkURL(Poco::URI(configuration.url));
const auto & credentials_prefix = config_prefix + ".credentials";
if (config.has(credentials_prefix))
{
credentials.setUsername(config.getString(credentials_prefix + ".user", ""));
credentials.setPassword(config.getString(credentials_prefix + ".password", ""));
}
const auto & headers_prefix = config_prefix + ".headers";
if (config.has(headers_prefix))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(headers_prefix, config_keys);
header_entries.reserve(config_keys.size());
for (const auto & key : config_keys)
{
const auto header_key = config.getString(headers_prefix + "." + key + ".name", "");
const auto header_value = config.getString(headers_prefix + "." + key + ".value", "");
header_entries.emplace_back(std::make_tuple(header_key, header_value));
}
}
credentials.setUsername(credentials_.getUsername());
credentials.setPassword(credentials_.getPassword());
}
HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
: log(&Poco::Logger::get("HTTPDictionarySource"))
, update_time{other.update_time}
, dict_struct{other.dict_struct}
, url{other.url}
, header_entries{other.header_entries}
, update_field{other.update_field}
, format{other.format}
, sample_block{other.sample_block}
, update_time(other.update_time)
, dict_struct(other.dict_struct)
, configuration(other.configuration)
, sample_block(other.sample_block)
, context(Context::createCopy(other.context))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
{
@ -89,11 +63,11 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
BlockInputStreamPtr HTTPDictionarySource::createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer_ptr)
{
Poco::URI uri(url);
Poco::URI uri(configuration.url);
String http_request_compression_method_str = http_buffer_ptr->getCompressionMethod();
auto in_ptr_wrapped
= wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod(uri.getPath(), http_request_compression_method_str));
auto input_stream = context->getInputFormat(format, *in_ptr_wrapped, sample_block, max_block_size);
auto input_stream = context->getInputFormat(configuration.format, *in_ptr_wrapped, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(in_ptr_wrapped));
}
@ -103,10 +77,10 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri)
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - configuration.update_lag;
WriteBufferFromOwnString out;
writeDateTimeText(hr_time, out);
uri.addQueryParameter(update_field, out.str());
uri.addQueryParameter(configuration.update_field, out.str());
}
else
{
@ -117,7 +91,7 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri)
BlockInputStreamPtr HTTPDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll {}", toString());
Poco::URI uri(url);
Poco::URI uri(configuration.url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_GET,
@ -126,13 +100,14 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
header_entries);
configuration.header_entries);
return createWrappedBuffer(std::move(in_ptr));
}
BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
{
Poco::URI uri(url);
Poco::URI uri(configuration.url);
getUpdateFieldAndDate(uri);
LOG_TRACE(log, "loadUpdatedAll {}", uri.toString());
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
@ -143,7 +118,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
header_entries);
configuration.header_entries);
return createWrappedBuffer(std::move(in_ptr));
}
@ -156,11 +132,11 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context->getOutputStreamParallelIfPossible(format, out_buffer, sample_block);
auto output_stream = context->getOutputStreamParallelIfPossible(configuration.format, out_buffer, sample_block);
formatBlock(output_stream, block);
};
Poco::URI uri(url);
Poco::URI uri(configuration.url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
@ -169,7 +145,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
header_entries);
configuration.header_entries);
return createWrappedBuffer(std::move(in_ptr));
}
@ -182,11 +159,11 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context->getOutputStreamParallelIfPossible(format, out_buffer, sample_block);
auto output_stream = context->getOutputStreamParallelIfPossible(configuration.format, out_buffer, sample_block);
formatBlock(output_stream, block);
};
Poco::URI uri(url);
Poco::URI uri(configuration.url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
@ -195,7 +172,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
header_entries);
configuration.header_entries);
return createWrappedBuffer(std::move(in_ptr));
}
@ -211,7 +189,7 @@ bool HTTPDictionarySource::supportsSelectiveLoad() const
bool HTTPDictionarySource::hasUpdateField() const
{
return !update_field.empty();
return !configuration.update_field.empty();
}
DictionarySourcePtr HTTPDictionarySource::clone() const
@ -221,7 +199,7 @@ DictionarySourcePtr HTTPDictionarySource::clone() const
std::string HTTPDictionarySource::toString() const
{
Poco::URI uri(url);
Poco::URI uri(configuration.url);
return uri.toString();
}
@ -239,8 +217,44 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
return std::make_unique<HTTPDictionarySource>(
dict_struct, config, config_prefix + ".http", sample_block, context_local_copy, created_from_ddl);
const auto & settings_config_prefix = config_prefix + ".http";
const auto & credentials_prefix = settings_config_prefix + ".credentials";
Poco::Net::HTTPBasicCredentials credentials;
if (config.has(credentials_prefix))
{
credentials.setUsername(config.getString(credentials_prefix + ".user", ""));
credentials.setPassword(config.getString(credentials_prefix + ".password", ""));
}
const auto & headers_prefix = settings_config_prefix + ".headers";
ReadWriteBufferFromHTTP::HTTPHeaderEntries header_entries;
if (config.has(headers_prefix))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(headers_prefix, config_keys);
header_entries.reserve(config_keys.size());
for (const auto & key : config_keys)
{
const auto header_key = config.getString(headers_prefix + "." + key + ".name", "");
const auto header_value = config.getString(headers_prefix + "." + key + ".value", "");
header_entries.emplace_back(std::make_tuple(header_key, header_value));
}
}
auto configuration = HTTPDictionarySource::Configuration
{
.url = config.getString(settings_config_prefix + ".url", ""),
.format =config.getString(settings_config_prefix + ".format", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.header_entries = std::move(header_entries)
};
return std::make_unique<HTTPDictionarySource>(dict_struct, configuration, credentials, sample_block, context_local_copy, created_from_ddl);
};
factory.registerSource("http", create_table_source);
}

View File

@ -22,10 +22,20 @@ namespace DB
class HTTPDictionarySource final : public IDictionarySource
{
public:
struct Configuration
{
const std::string url;
const std::string format;
const std::string update_field;
const UInt64 update_lag;
const ReadWriteBufferFromHTTP::HTTPHeaderEntries header_entries;
};
HTTPDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Configuration & configuration,
const Poco::Net::HTTPBasicCredentials & credentials_,
Block & sample_block_,
ContextConstPtr context_,
bool created_from_ddl);
@ -63,11 +73,8 @@ private:
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string url;
const Configuration configuration;
Poco::Net::HTTPBasicCredentials credentials;
ReadWriteBufferFromHTTP::HTTPHeaderEntries header_entries;
std::string update_field;
const std::string format;
Block sample_block;
ContextConstPtr context;
ConnectionTimeouts timeouts;

View File

@ -90,13 +90,26 @@ ColumnPtr HashedDictionary<dictionary_key_type, sparse>::getColumn(
using ValueType = DictionaryValueType<AttributeType>;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
const auto attribute_null_value = std::get<ValueType>(attribute.null_values);
const auto & attribute_null_value = std::get<ValueType>(attribute.null_values);
AttributeType null_value = static_cast<AttributeType>(attribute_null_value);
DictionaryDefaultValueExtractor<AttributeType> default_value_extractor(std::move(null_value), default_values_column);
auto column = ColumnProvider::getColumn(dictionary_attribute, size);
if constexpr (std::is_same_v<ValueType, StringRef>)
if constexpr (std::is_same_v<ValueType, Array>)
{
auto * out = column.get();
getItemsImpl<ValueType>(
attribute,
extractor,
[&](const size_t, const Array & value) { out->insert(value); },
[&](const size_t)
{
},
default_value_extractor);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto * out = column.get();

Some files were not shown because too many files have changed in this diff Show More