Merge branch 'master' into move-odbc-test

This commit is contained in:
Nikita Mikhaylov 2023-01-02 21:22:50 +01:00 committed by GitHub
commit b34fc8650c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
213 changed files with 4621 additions and 2444 deletions

View File

@ -107,8 +107,11 @@ fi
mv ./programs/clickhouse* /output
[ -x ./programs/self-extracting/clickhouse ] && mv ./programs/self-extracting/clickhouse /output
mv ./src/unit_tests_dbms /output ||: # may not exist for some binary builds
find . -name '*.so' -print -exec mv '{}' /output \;
find . -name '*.so.*' -print -exec mv '{}' /output \;
# Exclude cargo build directory since it may have some shared libraries
# (even though they are not required for the clickhouse binary)
find . -name '*.so' -not -path '*/cargo/*' -print -exec mv '{}' /output \;
find . -name '*.so.*' -not -path '*/cargo/*' -print -exec mv '{}' /output \;
prepare_combined_output () {
local OUTPUT

View File

@ -34,7 +34,14 @@ SETTINGS
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
```
Required parameters:
@ -46,13 +53,20 @@ Required parameters:
Optional parameters:
- `kafka_row_delimiter` — Delimiter character, which ends the message.
- `kafka_row_delimiter` — Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `kafka_schema` — Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` — The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll (default: `max_block_size`).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data).
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer (default: `0`). When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block).
- `kafka_num_consumers` — The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed. Default: `1`.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block. Default: `0`.
- `kafka_client_id` — Client identifier. Empty by default.
- `kafka_poll_timeout_ms` — Timeout for single poll from Kafka. Default: [stream_poll_timeout_ms](../../../operations/settings/settings.md#stream_poll_timeout_ms).
- `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
- `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`.
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default, stream.
- `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`.
- `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`.
Examples:
@ -94,7 +108,7 @@ Do not use this method in new projects. If possible, switch old projects to the
``` sql
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
```
</details>
@ -193,6 +207,14 @@ Example:
- `_headers.name` — Array of message's headers keys.
- `_headers.value` — Array of message's headers values.
## Data formats support {#data-formats-support}
Kafka engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one Kafka message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one Kafka message can be controlled by setting `kafka_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
**See Also**
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)

View File

@ -37,8 +37,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[nats_max_block_size = N,]
[nats_flush_interval_ms = N,]
[nats_username = 'user',]
[nats_password = 'password']
[redis_password = 'clickhouse']
[nats_password = 'password',]
[nats_token = 'clickhouse',]
[nats_startup_connect_tries = '5']
[nats_max_rows_per_message = 1]
```
Required parameters:
@ -49,7 +51,7 @@ Required parameters:
Optional parameters:
- `nats_row_delimiter` Delimiter character, which ends the message.
- `nats_row_delimiter` Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `nats_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `nats_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `nats_queue_group` Name for queue group of NATS subscribers. Default is the table name.
@ -57,11 +59,13 @@ Optional parameters:
- `nats_reconnect_wait` Amount of time in milliseconds to sleep between each reconnect attempt. Default: `5000`.
- `nats_server_list` - Server list for connection. Can be specified to connect to NATS cluster.
- `nats_skip_broken_messages` - NATS message parser tolerance to schema-incompatible messages per block. Default: `0`. If `nats_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS.
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS.
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `nats_username` - NATS username.
- `nats_password` - NATS password.
- `nats_token` - NATS auth token.
- `nats_startup_connect_tries` - Number of connect tries at startup. Default: `5`.
- `nats_max_rows_per_message` — The maximum number of rows written in one NATS message for row-based formats. (default : `1`).
SSL connection:
@ -159,6 +163,14 @@ If you want to change the target table by using `ALTER`, we recommend disabling
## Virtual Columns {#virtual-columns}
- `_subject` - NATS message subject.
- `_subject` - NATS message subject.
## Data formats support {#data-formats-support}
NATS engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one NATS message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one NATS message can be controlled by setting `nats_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/nats/) <!--hide-->

View File

@ -37,8 +37,16 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish']
[rabbitmq_flush_interval_ms = N,]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
[rabbitmq_queue_consume = false,]
[rabbitmq_address = '',]
[rabbitmq_vhost = '/',]
[rabbitmq_queue_consume = false,]
[rabbitmq_username = '',]
[rabbitmq_password = '',]
[rabbitmq_commit_on_select = false,]
[rabbitmq_max_rows_per_message = 1]
```
Required parameters:
@ -49,19 +57,27 @@ Required parameters:
Optional parameters:
- `rabbitmq_exchange_type` The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`.
- `rabbitmq_routing_key_list` A comma-separated list of routing keys.
- `rabbitmq_row_delimiter` Delimiter character, which ends the message.
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. Default: `0`. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
- `rabbitmq_queue_settings_list` - allows to set RabbitMQ settings when creating a queue. Available settings: `x-max-length`, `x-max-length-bytes`, `x-message-ttl`, `x-expires`, `x-priority`, `x-max-priority`, `x-overflow`, `x-dead-letter-exchange`, `x-queue-type`. The `durable` setting is enabled automatically for the queue.
- `rabbitmq_exchange_type` The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`.
- `rabbitmq_routing_key_list` A comma-separated list of routing keys.
- `rabbitmq_row_delimiter` Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. Default: `1`
- `rabbitmq_num_queues` Total number of queues. Increasing this number can significantly improve performance. Default: `1`.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `rabbitmq_max_block_size` - Number of row collected before flushing data from RabbitMQ. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `rabbitmq_flush_interval_ms` - Timeout for flushing data from RabbitMQ. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `rabbitmq_queue_settings_list` - allows to set RabbitMQ settings when creating a queue. Available settings: `x-max-length`, `x-max-length-bytes`, `x-message-ttl`, `x-expires`, `x-priority`, `x-max-priority`, `x-overflow`, `x-dead-letter-exchange`, `x-queue-type`. The `durable` setting is enabled automatically for the queue.
- `rabbitmq_address` - Address for connection. Use ether this setting or `rabbitmq_host_port`.
- `rabbitmq_vhost` - RabbitMQ vhost. Default: `'\'`.
- `rabbitmq_queue_consume` - Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings. Default: `false`.
- `rabbitmq_username` - RabbitMQ username.
- `rabbitmq_password` - RabbitMQ password.
- `rabbitmq_commit_on_select` - Commit messages when select query is made. Default: `false`.
- `rabbitmq_max_rows_per_message` — The maximum number of rows written in one RabbitMQ message for row-based formats. Default : `1`.
SSL connection:
@ -166,11 +182,20 @@ Example:
## Virtual Columns {#virtual-columns}
- `_exchange_name` - RabbitMQ exchange name.
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.
- `_exchange_name` - RabbitMQ exchange name.
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.
## Data formats support {#data-formats-support}
RabbitMQ engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one RabbitMQ message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one RabbitMQ message can be controlled by setting `rabbitmq_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/rabbitmq/) <!--hide-->

View File

@ -127,6 +127,13 @@ Default value: 100000.
A large number of parts in a table reduces performance of ClickHouse queries and increases ClickHouse boot time. Most often this is a consequence of an incorrect design (mistakes when choosing a partitioning strategy - too small partitions).
## simultaneous_parts_removal_limit {#simultaneous-parts-removal-limit}
If there are a lot of outdated parts cleanup thread will try to delete up to `simultaneous_parts_removal_limit` parts during one iteration.
`simultaneous_parts_removal_limit` set to `0` means unlimited.
Default value: 0.
## replicated_deduplication_window {#replicated-deduplication-window}
The number of most recently inserted blocks for which ClickHouse Keeper stores hash sums to check for duplicates.

View File

@ -1011,6 +1011,12 @@ The default value is 7500.
The smaller the value, the more often data is flushed into the table. Setting the value too low leads to poor performance.
## stream_poll_timeout_ms {#stream_poll_timeout_ms}
Timeout for polling data from/to streaming storages.
Default value: 500.
## load_balancing {#settings-load_balancing}
Specifies the algorithm of replicas selection that is used for distributed query processing.

View File

@ -0,0 +1,41 @@
---
slug: /en/sql-reference/aggregate-functions/reference/grouparraylast
sidebar_position: 110
---
# groupArrayLast
Syntax: `groupArrayLast(max_size)(x)`
Creates an array of last argument values.
For example, `groupArrayLast(1)(x)` is equivalent to `[anyLast (x)]`.
In some cases, you can still rely on the order of execution. This applies to cases when `SELECT` comes from a subquery that uses `ORDER BY`.
**Example**
Query:
```sql
select groupArrayLast(2)(number+1) numbers from numbers(10)
```
Result:
```text
┌─numbers─┐
│ [9,10] │
└─────────┘
```
In compare to `groupArray`:
```sql
select groupArray(2)(number+1) numbers from numbers(10)
```
```text
┌─numbers─┐
│ [1,2] │
└─────────┘
```

View File

@ -32,6 +32,7 @@ ClickHouse-specific aggregate functions:
- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)
- [topKWeighted](../../../sql-reference/aggregate-functions/reference/topkweighted.md)
- [groupArray](../../../sql-reference/aggregate-functions/reference/grouparray.md)
- [groupArrayLast](../../../sql-reference/aggregate-functions/reference/grouparraylast.md)
- [groupUniqArray](../../../sql-reference/aggregate-functions/reference/groupuniqarray.md)
- [groupArrayInsertAt](../../../sql-reference/aggregate-functions/reference/grouparrayinsertat.md)
- [groupArrayMovingAvg](../../../sql-reference/aggregate-functions/reference/grouparraymovingavg.md)

View File

@ -1085,6 +1085,8 @@ SELECT timeSlots(toDateTime64('1980-12-12 21:01:02.1234', 4, 'UTC'), toDecimal64
Formats a Time according to the given Format string. Format is a constant expression, so you cannot have multiple formats for a single result column.
formatDateTime uses MySQL datetime format style, refer to https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_date-format.
**Syntax**
``` sql
@ -1158,6 +1160,64 @@ Result:
└─────────────────────────────────────────────────────────────────────┘
```
**See Also**
- [formatDateTimeInJodaSyntax](##formatDateTimeInJodaSyntax)
## formatDateTimeInJodaSyntax
Similar to formatDateTime, except that it formats datetime in Joda style instead of MySQL style. Refer to https://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html.
**Replacement fields**
Using replacement fields, you can define a pattern for the resulting string.
| Placeholder | Description | Presentation | Examples |
| ----------- | ----------- | ------------- | -------- |
| G | era | text | AD |
| C | century of era (>=0) | number | 20 |
| Y | year of era (>=0) | year | 1996 |
| x | weekyear(not supported yet) | year | 1996 |
| w | week of weekyear(not supported yet) | number | 27 |
| e | day of week | number | 2 |
| E | day of week | text | Tuesday; Tue |
| y | year | year | 1996 |
| D | day of year | number | 189 |
| M | month of year | month | July; Jul; 07 |
| d | day of month | number | 10 |
| a | halfday of day | text | PM |
| K | hour of halfday (0~11) | number | 0 |
| h | clockhour of halfday (1~12) | number | 12 |
| H | hour of day (0~23) | number | 0 |
| k | clockhour of day (1~24) | number | 24 |
| m | minute of hour | number | 30 |
| s | second of minute | number | 55 |
| S | fraction of second(not supported yet) | number | 978 |
| z | time zone(short name not supported yet) | text | Pacific Standard Time; PST |
| Z | time zone offset/id(not supported yet) | zone | -0800; -08:00; America/Los_Angeles |
| ' | escape for text | delimiter| |
| '' | single quote | literal | ' |
**Example**
Query:
``` sql
SELECT formatDateTimeInJodaSyntax(toDateTime('2010-01-04 12:34:56'), 'yyyy-MM-dd HH:mm:ss')
```
Result:
```
┌─formatDateTimeInJodaSyntax(toDateTime('2010-01-04 12:34:56'), 'yyyy-MM-dd HH:mm:ss')─┐
│ 2010-01-04 12:34:56 │
└─────────────────────────────────────────────────────────────────────────────────────────┘
```
## dateName
Returns specified part of date.
@ -1241,6 +1301,8 @@ Result:
Function converts Unix timestamp to a calendar date and a time of a day. When there is only a single argument of [Integer](../../sql-reference/data-types/int-uint.md) type, it acts in the same way as [toDateTime](../../sql-reference/functions/type-conversion-functions.md#todatetime) and return [DateTime](../../sql-reference/data-types/datetime.md) type.
FROM_UNIXTIME uses MySQL datetime format style, refer to https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_date-format.
Alias: `fromUnixTimestamp`.
**Example:**
@ -1273,6 +1335,28 @@ SELECT FROM_UNIXTIME(1234334543, '%Y-%m-%d %R:%S') AS DateTime;
└─────────────────────┘
```
**See Also**
- [fromUnixTimestampInJodaSyntax](##fromUnixTimestampInJodaSyntax)
## fromUnixTimestampInJodaSyntax
Similar to FROM_UNIXTIME, except that it formats time in Joda style instead of MySQL style. Refer to https://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html.
**Example:**
Query:
``` sql
SELECT fromUnixTimestampInJodaSyntax(1669804872, 'yyyy-MM-dd HH:mm:ss', 'UTC');
```
Result:
```
┌─fromUnixTimestampInJodaSyntax(1669804872, 'yyyy-MM-dd HH:mm:ss', 'UTC')─┐
│ 2022-11-30 10:41:12 │
└────────────────────────────────────────────────────────────────────────────┘
```
## toModifiedJulianDay
Converts a [Proleptic Gregorian calendar](https://en.wikipedia.org/wiki/Proleptic_Gregorian_calendar) date in text form `YYYY-MM-DD` to a [Modified Julian Day](https://en.wikipedia.org/wiki/Julian_day#Variants) number in Int32. This function supports date from `0000-01-01` to `9999-12-31`. It raises an exception if the argument cannot be parsed as a date, or the date is invalid.

View File

@ -1497,7 +1497,7 @@ formatRow(format, x, y, ...)
**Returned value**
- A formatted string (for text formats it's usually terminated with the new line character).
- A formatted string. (for text formats it's usually terminated with the new line character).
**Example**
@ -1521,9 +1521,39 @@ Result:
└──────────────────────────────────┘
```
**Note**: If format contains suffix/prefix, it will be written in each row.
**Example**
Query:
``` sql
SELECT formatRow('CustomSeparated', number, 'good')
FROM numbers(3)
SETTINGS format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>'
```
Result:
``` text
┌─formatRow('CustomSeparated', number, 'good')─┐
<prefix>
0 good
<suffix>
<prefix>
1 good
<suffix>
<prefix>
2 good
<suffix>
└──────────────────────────────────────────────┘
```
Note: Only row-based formats are supported in this function.
## formatRowNoNewline
Converts arbitrary expressions into a string via given format. The function trims the last `\n` if any.
Converts arbitrary expressions into a string via given format. Differs from formatRow in that this function trims the last `\n` if any.
**Syntax**

View File

@ -1316,7 +1316,7 @@ formatRow(format, x, y, ...)
**Возвращаемое значение**
- Отформатированная строка (в текстовых форматах обычно с завершающим переводом строки).
- Отформатированная строка. (в текстовых форматах обычно с завершающим переводом строки).
**Пример**
@ -1340,9 +1340,39 @@ FROM numbers(3);
└──────────────────────────────────┘
```
**Примечание**: если формат содержит префикс/суффикс, то он будет записан в каждой строке.
**Пример**
Запрос:
``` sql
SELECT formatRow('CustomSeparated', number, 'good')
FROM numbers(3)
SETTINGS format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>'
```
Результат:
``` text
┌─formatRow('CustomSeparated', number, 'good')─┐
<prefix>
0 good
<suffix>
<prefix>
1 good
<suffix>
<prefix>
2 good
<suffix>
└──────────────────────────────────────────────┘
```
**Примечание**: данная функция поддерживает только строковые форматы вывода.
## formatRowNoNewline {#formatrownonewline}
Преобразует произвольные выражения в строку заданного формата. При этом удаляет лишние переводы строк `\n`, если они появились.
Преобразует произвольные выражения в строку заданного формата. Отличается от функции formatRow тем, что удаляет лишний перевод строки `\n` а конце, если он есть.
**Синтаксис**

View File

@ -40,15 +40,10 @@ inline AggregateFunctionPtr createAggregateFunctionGroupArrayImpl(const DataType
return std::make_shared<GroupArrayGeneralImpl<GroupArrayNodeString, Trait>>(argument_type, parameters, std::forward<TArgs>(args)...);
return std::make_shared<GroupArrayGeneralImpl<GroupArrayNodeGeneral, Trait>>(argument_type, parameters, std::forward<TArgs>(args)...);
// Link list implementation doesn't show noticeable performance improvement
// if (which.idx == TypeIndex::String)
// return std::make_shared<GroupArrayGeneralListImpl<GroupArrayListNodeString, Trait>>(argument_type, std::forward<TArgs>(args)...);
// return std::make_shared<GroupArrayGeneralListImpl<GroupArrayListNodeGeneral, Trait>>(argument_type, std::forward<TArgs>(args)...);
}
template <bool Tlast>
AggregateFunctionPtr createAggregateFunctionGroupArray(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
@ -79,9 +74,13 @@ AggregateFunctionPtr createAggregateFunctionGroupArray(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!limit_size)
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait<false, Sampler::NONE>>(argument_types[0], parameters);
{
if (Tlast)
throw Exception("groupArrayLast make sense only with max_elems (groupArrayLast(max_elems)())", ErrorCodes::BAD_ARGUMENTS);
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait</* Thas_limit= */ false, Tlast, /* Tsampler= */ Sampler::NONE>>(argument_types[0], parameters);
}
else
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait<true, Sampler::NONE>>(argument_types[0], parameters, max_elems);
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait</* Thas_limit= */ true, Tlast, /* Tsampler= */ Sampler::NONE>>(argument_types[0], parameters, max_elems);
}
AggregateFunctionPtr createAggregateFunctionGroupArraySample(
@ -114,7 +113,7 @@ AggregateFunctionPtr createAggregateFunctionGroupArraySample(
else
seed = thread_local_rng();
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait<true, Sampler::RNG>>(argument_types[0], parameters, max_elems, seed);
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait</* Thas_limit= */ true, /* Tlast= */ false, /* Tsampler= */ Sampler::RNG>>(argument_types[0], parameters, max_elems, seed);
}
}
@ -124,8 +123,9 @@ void registerAggregateFunctionGroupArray(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true };
factory.registerFunction("groupArray", { createAggregateFunctionGroupArray, properties });
factory.registerFunction("groupArray", { createAggregateFunctionGroupArray<false>, properties });
factory.registerFunction("groupArraySample", { createAggregateFunctionGroupArraySample, properties });
factory.registerFunction("groupArrayLast", { createAggregateFunctionGroupArray<true>, properties });
}
}

View File

@ -37,24 +37,25 @@ enum class Sampler
{
NONE,
RNG,
DETERMINATOR // TODO
};
template <bool Thas_limit, Sampler Tsampler>
template <bool Thas_limit, bool Tlast, Sampler Tsampler>
struct GroupArrayTrait
{
static constexpr bool has_limit = Thas_limit;
static constexpr bool last = Tlast;
static constexpr Sampler sampler = Tsampler;
};
template <typename Trait>
static constexpr const char * getNameByTrait()
{
if (Trait::last)
return "groupArrayLast";
if (Trait::sampler == Sampler::NONE)
return "groupArray";
else if (Trait::sampler == Sampler::RNG)
return "groupArraySample";
// else if (Trait::sampler == Sampler::DETERMINATOR) // TODO
UNREACHABLE();
}
@ -100,6 +101,8 @@ struct GroupArrayNumericData<T, false>
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
// For groupArrayLast()
size_t total_values = 0;
Array value;
};
@ -129,7 +132,7 @@ public:
String getName() const override { return getNameByTrait<Trait>(); }
void insert(Data & a, const T & v, Arena * arena) const
void insertWithSampler(Data & a, const T & v, Arena * arena) const
{
++a.total_values;
if (a.value.size() < max_elems)
@ -151,88 +154,107 @@ public:
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
const auto & row_value = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
auto & cur_elems = this->data(place);
++cur_elems.total_values;
if constexpr (Trait::sampler == Sampler::NONE)
{
if (limit_num_elems && this->data(place).value.size() >= max_elems)
if (limit_num_elems && cur_elems.value.size() >= max_elems)
{
if constexpr (Trait::last)
cur_elems.value[(cur_elems.total_values - 1) % max_elems] = row_value;
return;
}
this->data(place).value.push_back(assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num], arena);
cur_elems.value.push_back(row_value, arena);
}
if constexpr (Trait::sampler == Sampler::RNG)
{
auto & a = this->data(place);
++a.total_values;
if (a.value.size() < max_elems)
a.value.push_back(assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num], arena);
if (cur_elems.value.size() < max_elems)
cur_elems.value.push_back(row_value, arena);
else
{
UInt64 rnd = a.genRandom(a.total_values);
UInt64 rnd = cur_elems.genRandom(cur_elems.total_values);
if (rnd < max_elems)
a.value[rnd] = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
cur_elems.value[rnd] = row_value;
}
}
// TODO
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
if constexpr (Trait::sampler == Sampler::NONE)
{
auto & cur_elems = this->data(place);
auto & rhs_elems = this->data(rhs);
auto & cur_elems = this->data(place);
auto & rhs_elems = this->data(rhs);
if (!limit_num_elems)
if (rhs_elems.value.empty())
return;
if constexpr (Trait::last)
mergeNoSamplerLast(cur_elems, rhs_elems, arena);
else if constexpr (Trait::sampler == Sampler::NONE)
mergeNoSampler(cur_elems, rhs_elems, arena);
else if constexpr (Trait::sampler == Sampler::RNG)
mergeWithRNGSampler(cur_elems, rhs_elems, arena);
}
void mergeNoSamplerLast(Data & cur_elems, const Data & rhs_elems, Arena * arena) const
{
UInt64 new_elements = std::min(static_cast<size_t>(max_elems), cur_elems.value.size() + rhs_elems.value.size());
cur_elems.value.resize_exact(new_elements, arena);
for (auto & value : rhs_elems.value)
{
cur_elems.value[cur_elems.total_values % max_elems] = value;
++cur_elems.total_values;
}
assert(rhs_elems.total_values >= rhs_elems.value.size());
cur_elems.total_values += rhs_elems.total_values - rhs_elems.value.size();
}
void mergeNoSampler(Data & cur_elems, const Data & rhs_elems, Arena * arena) const
{
if (!limit_num_elems)
{
if (rhs_elems.value.size())
cur_elems.value.insertByOffsets(rhs_elems.value, 0, rhs_elems.value.size(), arena);
}
else
{
UInt64 elems_to_insert = std::min(static_cast<size_t>(max_elems) - cur_elems.value.size(), rhs_elems.value.size());
if (elems_to_insert)
cur_elems.value.insertByOffsets(rhs_elems.value, 0, elems_to_insert, arena);
}
}
void mergeWithRNGSampler(Data & cur_elems, const Data & rhs_elems, Arena * arena) const
{
if (rhs_elems.total_values <= max_elems)
{
for (size_t i = 0; i < rhs_elems.value.size(); ++i)
insertWithSampler(cur_elems, rhs_elems.value[i], arena);
}
else if (cur_elems.total_values <= max_elems)
{
decltype(cur_elems.value) from;
from.swap(cur_elems.value, arena);
cur_elems.value.assign(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
cur_elems.total_values = rhs_elems.total_values;
for (size_t i = 0; i < from.size(); ++i)
insertWithSampler(cur_elems, from[i], arena);
}
else
{
cur_elems.randomShuffle();
cur_elems.total_values += rhs_elems.total_values;
for (size_t i = 0; i < max_elems; ++i)
{
if (rhs_elems.value.size())
cur_elems.value.insertByOffsets(rhs_elems.value, 0, rhs_elems.value.size(), arena);
}
else
{
UInt64 elems_to_insert = std::min(static_cast<size_t>(max_elems) - cur_elems.value.size(), rhs_elems.value.size());
if (elems_to_insert)
cur_elems.value.insertByOffsets(rhs_elems.value, 0, elems_to_insert, arena);
UInt64 rnd = cur_elems.genRandom(cur_elems.total_values);
if (rnd < rhs_elems.total_values)
cur_elems.value[i] = rhs_elems.value[i];
}
}
if constexpr (Trait::sampler == Sampler::RNG)
{
if (this->data(rhs).value.empty()) /// rhs state is empty
return;
auto & a = this->data(place);
auto & b = this->data(rhs);
if (b.total_values <= max_elems)
{
for (size_t i = 0; i < b.value.size(); ++i)
insert(a, b.value[i], arena);
}
else if (a.total_values <= max_elems)
{
decltype(a.value) from;
from.swap(a.value, arena);
a.value.assign(b.value.begin(), b.value.end(), arena);
a.total_values = b.total_values;
for (size_t i = 0; i < from.size(); ++i)
insert(a, from[i], arena);
}
else
{
a.randomShuffle();
a.total_values += b.total_values;
for (size_t i = 0; i < max_elems; ++i)
{
UInt64 rnd = a.genRandom(a.total_values);
if (rnd < b.total_values)
a.value[i] = b.value[i];
}
}
}
// TODO
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
@ -242,6 +264,9 @@ public:
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(value.data()), size * sizeof(value[0]));
if constexpr (Trait::last)
DB::writeIntBinary<size_t>(this->data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::writeIntBinary<size_t>(this->data(place).total_values, buf);
@ -249,9 +274,6 @@ public:
rng_buf << this->data(place).rng;
DB::writeStringBinary(rng_buf.str(), buf);
}
// TODO
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
@ -267,9 +289,12 @@ public:
auto & value = this->data(place).value;
value.resize(size, arena);
value.resize_exact(size, arena);
buf.readStrict(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
if constexpr (Trait::last)
DB::readIntBinary<size_t>(this->data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::readIntBinary<size_t>(this->data(place).total_values, buf);
@ -278,9 +303,6 @@ public:
ReadBufferFromString rng_buf(rng_string);
rng_buf >> this->data(place).rng;
}
// TODO
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
@ -396,6 +418,8 @@ struct GroupArrayGeneralData<Node, false>
using Allocator = MixedAlignedArenaAllocator<alignof(Node *), 4096>;
using Array = PODArray<Node *, 32, Allocator>;
// For groupArrayLast()
size_t total_values = 0;
Array value;
};
@ -430,7 +454,7 @@ public:
String getName() const override { return getNameByTrait<Trait>(); }
void insert(Data & a, const Node * v, Arena * arena) const
void insertWithSampler(Data & a, const Node * v, Arena * arena) const
{
++a.total_values;
if (a.value.size() < max_elems)
@ -452,96 +476,110 @@ public:
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto & cur_elems = data(place);
++cur_elems.total_values;
if constexpr (Trait::sampler == Sampler::NONE)
{
if (limit_num_elems && data(place).value.size() >= max_elems)
if (limit_num_elems && cur_elems.value.size() >= max_elems)
{
if (Trait::last)
{
Node * node = Node::allocate(*columns[0], row_num, arena);
cur_elems.value[(cur_elems.total_values - 1) % max_elems] = node;
}
return;
}
Node * node = Node::allocate(*columns[0], row_num, arena);
data(place).value.push_back(node, arena);
cur_elems.value.push_back(node, arena);
}
if constexpr (Trait::sampler == Sampler::RNG)
{
auto & a = data(place);
++a.total_values;
if (a.value.size() < max_elems)
a.value.push_back(Node::allocate(*columns[0], row_num, arena), arena);
if (cur_elems.value.size() < max_elems)
cur_elems.value.push_back(Node::allocate(*columns[0], row_num, arena), arena);
else
{
UInt64 rnd = a.genRandom(a.total_values);
UInt64 rnd = cur_elems.genRandom(cur_elems.total_values);
if (rnd < max_elems)
a.value[rnd] = Node::allocate(*columns[0], row_num, arena);
cur_elems.value[rnd] = Node::allocate(*columns[0], row_num, arena);
}
}
// TODO
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
if constexpr (Trait::sampler == Sampler::NONE)
mergeNoSampler(place, rhs, arena);
else if constexpr (Trait::sampler == Sampler::RNG)
mergeWithRNGSampler(place, rhs, arena);
// TODO
// else if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
auto & cur_elems = data(place);
auto & rhs_elems = data(rhs);
void ALWAYS_INLINE mergeNoSampler(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const
{
if (data(rhs).value.empty()) /// rhs state is empty
if (rhs_elems.value.empty())
return;
if constexpr (Trait::last)
mergeNoSamplerLast(cur_elems, rhs_elems, arena);
else if constexpr (Trait::sampler == Sampler::NONE)
mergeNoSampler(cur_elems, rhs_elems, arena);
else if constexpr (Trait::sampler == Sampler::RNG)
mergeWithRNGSampler(cur_elems, rhs_elems, arena);
}
void ALWAYS_INLINE mergeNoSamplerLast(Data & cur_elems, const Data & rhs_elems, Arena * arena) const
{
UInt64 new_elements = std::min(static_cast<size_t>(max_elems), cur_elems.value.size() + rhs_elems.value.size());
cur_elems.value.resize_exact(new_elements, arena);
for (auto & value : rhs_elems.value)
{
cur_elems.value[cur_elems.total_values % max_elems] = value->clone(arena);
++cur_elems.total_values;
}
assert(rhs_elems.total_values >= rhs_elems.value.size());
cur_elems.total_values += rhs_elems.total_values - rhs_elems.value.size();
}
void ALWAYS_INLINE mergeNoSampler(Data & cur_elems, const Data & rhs_elems, Arena * arena) const
{
UInt64 new_elems;
if (limit_num_elems)
{
if (data(place).value.size() >= max_elems)
if (cur_elems.value.size() >= max_elems)
return;
new_elems = std::min(data(rhs).value.size(), static_cast<size_t>(max_elems) - data(place).value.size());
new_elems = std::min(rhs_elems.value.size(), static_cast<size_t>(max_elems) - cur_elems.value.size());
}
else
new_elems = data(rhs).value.size();
new_elems = rhs_elems.value.size();
auto & a = data(place).value;
auto & b = data(rhs).value;
for (UInt64 i = 0; i < new_elems; ++i)
a.push_back(b[i]->clone(arena), arena);
cur_elems.value.push_back(rhs_elems.value[i]->clone(arena), arena);
}
void ALWAYS_INLINE mergeWithRNGSampler(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const
void ALWAYS_INLINE mergeWithRNGSampler(Data & cur_elems, const Data & rhs_elems, Arena * arena) const
{
if (data(rhs).value.empty()) /// rhs state is empty
return;
auto & a = data(place);
auto & b = data(rhs);
if (b.total_values <= max_elems)
if (rhs_elems.total_values <= max_elems)
{
for (size_t i = 0; i < b.value.size(); ++i)
insert(a, b.value[i], arena);
for (size_t i = 0; i < rhs_elems.value.size(); ++i)
insertWithSampler(cur_elems, rhs_elems.value[i], arena);
}
else if (a.total_values <= max_elems)
else if (cur_elems.total_values <= max_elems)
{
decltype(a.value) from;
from.swap(a.value, arena);
for (auto & node : b.value)
a.value.push_back(node->clone(arena), arena);
a.total_values = b.total_values;
decltype(cur_elems.value) from;
from.swap(cur_elems.value, arena);
for (auto & node : rhs_elems.value)
cur_elems.value.push_back(node->clone(arena), arena);
cur_elems.total_values = rhs_elems.total_values;
for (size_t i = 0; i < from.size(); ++i)
insert(a, from[i], arena);
insertWithSampler(cur_elems, from[i], arena);
}
else
{
a.randomShuffle();
a.total_values += b.total_values;
cur_elems.randomShuffle();
cur_elems.total_values += rhs_elems.total_values;
for (size_t i = 0; i < max_elems; ++i)
{
UInt64 rnd = a.genRandom(a.total_values);
if (rnd < b.total_values)
a.value[i] = b.value[i]->clone(arena);
UInt64 rnd = cur_elems.genRandom(cur_elems.total_values);
if (rnd < rhs_elems.total_values)
cur_elems.value[i] = rhs_elems.value[i]->clone(arena);
}
}
}
@ -554,6 +592,9 @@ public:
for (auto & node : value)
node->write(buf);
if constexpr (Trait::last)
DB::writeIntBinary<size_t>(data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::writeIntBinary<size_t>(data(place).total_values, buf);
@ -561,9 +602,6 @@ public:
rng_buf << data(place).rng;
DB::writeStringBinary(rng_buf.str(), buf);
}
// TODO
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
@ -582,10 +620,13 @@ public:
auto & value = data(place).value;
value.resize(elems, arena);
value.resize_exact(elems, arena);
for (UInt64 i = 0; i < elems; ++i)
value[i] = Node::read(buf, arena);
if constexpr (Trait::last)
DB::readIntBinary<size_t>(data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::readIntBinary<size_t>(data(place).total_values, buf);
@ -594,9 +635,6 @@ public:
ReadBufferFromString rng_buf(rng_string);
rng_buf >> data(place).rng;
}
// TODO
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
@ -622,222 +660,6 @@ public:
bool allocatesMemoryInArena() const override { return true; }
};
template <typename Node>
struct GroupArrayListNodeBase : public GroupArrayNodeBase<Node>
{
Node * next;
};
struct GroupArrayListNodeString : public GroupArrayListNodeBase<GroupArrayListNodeString>
{
using Node = GroupArrayListNodeString;
/// Create node from string
static Node * allocate(const IColumn & column, size_t row_num, Arena * arena)
{
StringRef string = assert_cast<const ColumnString &>(column).getDataAt(row_num);
Node * node = reinterpret_cast<Node *>(arena->alignedAlloc(sizeof(Node) + string.size, alignof(Node)));
node->next = nullptr;
node->size = string.size;
memcpy(node->data(), string.data, string.size);
return node;
}
void insertInto(IColumn & column) { assert_cast<ColumnString &>(column).insertData(data(), size); }
};
struct GroupArrayListNodeGeneral : public GroupArrayListNodeBase<GroupArrayListNodeGeneral>
{
using Node = GroupArrayListNodeGeneral;
static Node * allocate(const IColumn & column, size_t row_num, Arena * arena)
{
const char * begin = arena->alignedAlloc(sizeof(Node), alignof(Node));
StringRef value = column.serializeValueIntoArena(row_num, *arena, begin);
Node * node = reinterpret_cast<Node *>(const_cast<char *>(begin));
node->next = nullptr;
node->size = value.size;
return node;
}
void insertInto(IColumn & column) { column.deserializeAndInsertFromArena(data()); }
};
template <typename Node>
struct GroupArrayGeneralListData
{
UInt64 elems = 0;
Node * first = nullptr;
Node * last = nullptr;
};
/// Implementation of groupArray for String or any ComplexObject via linked list
/// It has poor performance in case of many small objects
template <typename Node, typename Trait>
class GroupArrayGeneralListImpl final
: public IAggregateFunctionDataHelper<GroupArrayGeneralListData<Node>, GroupArrayGeneralListImpl<Node, Trait>>
{
static constexpr bool limit_num_elems = Trait::has_limit;
using Data = GroupArrayGeneralListData<Node>;
static Data & data(AggregateDataPtr __restrict place) { return *reinterpret_cast<Data *>(place); }
static const Data & data(ConstAggregateDataPtr __restrict place) { return *reinterpret_cast<const Data *>(place); }
DataTypePtr & data_type;
UInt64 max_elems;
public:
GroupArrayGeneralListImpl(const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<GroupArrayGeneralListData<Node>, GroupArrayGeneralListImpl<Node, Trait>>({data_type_}, parameters_, std::make_shared<DataTypeArray>(data_type_))
, data_type(this->argument_types[0])
, max_elems(max_elems_)
{
}
String getName() const override { return getNameByTrait<Trait>(); }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
if (limit_num_elems && data(place).elems >= max_elems)
return;
Node * node = Node::allocate(*columns[0], row_num, arena);
if (unlikely(!data(place).first))
{
data(place).first = node;
data(place).last = node;
}
else
{
data(place).last->next = node;
data(place).last = node;
}
++data(place).elems;
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
/// It is sadly, but rhs's Arena could be destroyed
if (!data(rhs).first) /// rhs state is empty
return;
UInt64 new_elems;
UInt64 cur_elems = data(place).elems;
if (limit_num_elems)
{
if (data(place).elems >= max_elems)
return;
new_elems = std::min(data(place).elems + data(rhs).elems, static_cast<size_t>(max_elems));
}
else
{
new_elems = data(place).elems + data(rhs).elems;
}
Node * p_rhs = data(rhs).first;
Node * p_lhs;
if (unlikely(!data(place).last)) /// lhs state is empty
{
p_lhs = p_rhs->clone(arena);
data(place).first = data(place).last = p_lhs;
p_rhs = p_rhs->next;
++cur_elems;
}
else
{
p_lhs = data(place).last;
}
for (; cur_elems < new_elems; ++cur_elems)
{
Node * p_new = p_rhs->clone(arena);
p_lhs->next = p_new;
p_rhs = p_rhs->next;
p_lhs = p_new;
}
p_lhs->next = nullptr;
data(place).last = p_lhs;
data(place).elems = new_elems;
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override
{
writeVarUInt(data(place).elems, buf);
Node * p = data(place).first;
while (p)
{
p->write(buf);
p = p->next;
}
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena * arena) const override
{
UInt64 elems;
readVarUInt(elems, buf);
data(place).elems = elems;
if (unlikely(elems == 0))
return;
if (unlikely(elems > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
if (limit_num_elems && unlikely(elems > max_elems))
throw Exception("Too large array size, it should not exceed " + toString(max_elems), ErrorCodes::TOO_LARGE_ARRAY_SIZE);
Node * prev = Node::read(buf, arena);
data(place).first = prev;
for (UInt64 i = 1; i < elems; ++i)
{
Node * cur = Node::read(buf, arena);
prev->next = cur;
prev = cur;
}
prev->next = nullptr;
data(place).last = prev;
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto & column_array = assert_cast<ColumnArray &>(to);
auto & offsets = column_array.getOffsets();
offsets.push_back(offsets.back() + data(place).elems);
auto & column_data = column_array.getData();
if (std::is_same_v<Node, GroupArrayListNodeString>)
{
auto & string_offsets = assert_cast<ColumnString &>(column_data).getOffsets();
string_offsets.reserve(string_offsets.size() + data(place).elems);
}
Node * p = data(place).first;
while (p)
{
p->insertInto(column_data);
p = p->next;
}
}
bool allocatesMemoryInArena() const override { return true; }
};
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE
}

View File

@ -1116,6 +1116,8 @@ void ClientBase::onProfileEvents(Block & block)
/// Flush all buffers.
void ClientBase::resetOutput()
{
if (output_format)
output_format->finalize();
output_format.reset();
logs_out_stream.reset();

View File

@ -771,7 +771,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, input_format_json_try_infer_numbers_from_strings, true, "Try to infer numbers from string fields while schema inference", 0) \
M(Bool, input_format_json_validate_types_from_metadata, true, "For JSON/JSONCompact/JSONColumnsWithMetadata input formats this controls whether format parser should check if data types from input metadata match data types of the corresponding columns from the table", 0) \
M(Bool, input_format_json_read_numbers_as_strings, false, "Allow to parse numbers as strings in JSON input formats", 0) \
M(Bool, input_format_json_read_objects_as_strings, false, "Allow to parse JSON objects as strings in JSON input formats", 0) \
M(Bool, input_format_json_read_objects_as_strings, true, "Allow to parse JSON objects as strings in JSON input formats", 0) \
M(Bool, input_format_try_infer_integers, true, "Try to infer integers instead of floats while schema inference in text formats", 0) \
M(Bool, input_format_try_infer_dates, true, "Try to infer dates from string fields while schema inference in text formats", 0) \
M(Bool, input_format_try_infer_datetimes, true, "Try to infer datetimes from string fields while schema inference in text formats", 0) \

View File

@ -80,6 +80,7 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.1", {{"input_format_json_read_objects_as_strings", 0, 1, "Enable reading nested json objects as strings while object type is experimental"}}},
{"22.12", {{"max_size_to_preallocate_for_aggregation", 10'000'000, 100'000'000, "This optimizes performance"},
{"query_plan_aggregation_in_order", 0, 1, "Enable some refactoring around query plan"},
{"format_binary_max_string_size", 0, 1_GiB, "Prevent allocating large amount of memory"}}},

View File

@ -30,10 +30,26 @@ namespace ErrorCodes
void DataTypeCustomSimpleAggregateFunction::checkSupportedFunctions(const AggregateFunctionPtr & function)
{
/// TODO Make it sane.
static const std::vector<String> supported_functions{"any", "anyLast", "min",
"max", "sum", "sumWithOverflow", "groupBitAnd", "groupBitOr", "groupBitXor",
"sumMap", "minMap", "maxMap", "groupArrayArray", "groupUniqArrayArray",
"sumMappedArrays", "minMappedArrays", "maxMappedArrays"};
static const std::vector<String> supported_functions{
"any",
"anyLast",
"min",
"max",
"sum",
"sumWithOverflow",
"groupBitAnd",
"groupBitOr",
"groupBitXor",
"sumMap",
"minMap",
"maxMap",
"groupArrayArray",
"groupArrayLastArray",
"groupUniqArrayArray",
"sumMappedArrays",
"minMappedArrays",
"maxMappedArrays",
};
// check function
if (std::find(std::begin(supported_functions), std::end(supported_functions), function->getName()) == std::end(supported_functions))

View File

@ -328,7 +328,6 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_creator;
@ -342,9 +341,9 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
if (settings.output_format_parallel_formatting && getCreators(name).supports_parallel_formatting
&& !settings.output_format_json_array_of_rows)
{
auto formatter_creator = [output_getter, sample, callback, format_settings] (WriteBuffer & output) -> OutputFormatPtr
auto formatter_creator = [output_getter, sample, format_settings] (WriteBuffer & output) -> OutputFormatPtr
{
return output_getter(output, sample, {callback}, format_settings);
return output_getter(output, sample, format_settings);
};
ParallelFormattingOutputFormat::Params builder{buf, sample, formatter_creator, settings.max_threads};
@ -357,7 +356,7 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
return format;
}
return getOutputFormat(name, buf, sample, context, callback, _format_settings);
return getOutputFormat(name, buf, sample, context, _format_settings);
}
@ -366,7 +365,6 @@ OutputFormatPtr FormatFactory::getOutputFormat(
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_creator;
@ -376,15 +374,12 @@ OutputFormatPtr FormatFactory::getOutputFormat(
if (context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Format, name);
RowOutputFormatParams params;
params.callback = std::move(callback);
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
auto format = output_getter(buf, sample, params, format_settings);
auto format = output_getter(buf, sample, format_settings);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
@ -411,9 +406,8 @@ String FormatFactory::getContentType(
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
Block empty_block;
RowOutputFormatParams empty_params;
WriteBufferFromOwnString empty_buffer;
auto format = output_getter(empty_buffer, empty_block, empty_params, format_settings);
auto format = output_getter(empty_buffer, empty_block, format_settings);
return format->getContentType();
}

View File

@ -30,9 +30,9 @@ using ProcessorPtr = std::shared_ptr<IProcessor>;
class IInputFormat;
class IOutputFormat;
class IRowOutputFormat;
struct RowInputFormatParams;
struct RowOutputFormatParams;
class ISchemaReader;
class IExternalSchemaReader;
@ -41,6 +41,7 @@ using ExternalSchemaReaderPtr = std::shared_ptr<IExternalSchemaReader>;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
using RowOutputFormatPtr = std::shared_ptr<IRowOutputFormat>;
template <typename Allocator>
struct Memory;
@ -56,10 +57,6 @@ FormatSettings getFormatSettings(ContextPtr context, const T & settings);
class FormatFactory final : private boost::noncopyable
{
public:
/// This callback allows to perform some additional actions after reading a single row.
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;
/** Fast reading data from buffer and save result to memory.
* Reads at least `min_bytes` and some more until the end of the chunk, depends on the format.
* If `max_rows` is non-zero the function also stops after reading the `max_rows` number of rows
@ -72,12 +69,6 @@ public:
size_t min_bytes,
size_t max_rows)>;
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void(
const Columns & columns,
size_t row)>;
private:
using InputCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
@ -88,7 +79,6 @@ private:
using OutputCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)>;
/// Some input formats can have non trivial readPrefix() and readSuffix(),
@ -153,7 +143,6 @@ public:
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
OutputFormatPtr getOutputFormat(
@ -161,7 +150,6 @@ public:
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & _format_settings = std::nullopt) const;
String getContentType(

View File

@ -37,7 +37,7 @@ namespace
ProtobufReader::ProtobufReader(ReadBuffer & in_)
: in(in_)
: in(&in_)
{
}
@ -153,7 +153,7 @@ bool ProtobufReader::readFieldNumber(int & field_number_)
{
if (current_message_end == END_OF_FILE)
{
if (unlikely(in.eof()))
if (unlikely(in->eof()))
{
current_message_end = cursor;
return false;
@ -282,26 +282,26 @@ void ProtobufReader::readStringAndAppend(PaddedPODArray<UInt8> & str)
void ProtobufReader::readBinary(void* data, size_t size)
{
in.readStrict(reinterpret_cast<char*>(data), size);
in->readStrict(reinterpret_cast<char*>(data), size);
cursor += size;
}
void ProtobufReader::ignore(UInt64 num_bytes)
{
in.ignore(num_bytes);
in->ignore(num_bytes);
cursor += num_bytes;
}
void ProtobufReader::ignoreAll()
{
cursor += in.tryIgnore(std::numeric_limits<size_t>::max());
cursor += in->tryIgnore(std::numeric_limits<size_t>::max());
}
void ProtobufReader::moveCursorBackward(UInt64 num_bytes)
{
if (in.offset() < num_bytes)
if (in->offset() < num_bytes)
throwUnknownFormat();
in.position() -= num_bytes;
in->position() -= num_bytes;
cursor -= num_bytes;
}
@ -313,7 +313,7 @@ UInt64 ProtobufReader::continueReadingVarint(UInt64 first_byte)
# define PROTOBUF_READER_READ_VARINT_BYTE(byteNo) \
do \
{ \
in.readStrict(c); \
in->readStrict(c); \
++cursor; \
if constexpr ((byteNo) < 10) \
{ \
@ -352,7 +352,7 @@ void ProtobufReader::ignoreVarint()
# define PROTOBUF_READER_IGNORE_VARINT_BYTE(byteNo) \
do \
{ \
in.readStrict(c); \
in->readStrict(c); \
++cursor; \
if constexpr ((byteNo) < 10) \
{ \

View File

@ -32,7 +32,9 @@ public:
void readString(String & str);
void readStringAndAppend(PaddedPODArray<UInt8> & str);
bool eof() const { return in.eof(); }
bool eof() const { return in->eof(); }
void setReadBuffer(ReadBuffer & in_) { in = &in_; }
private:
void readBinary(void * data, size_t size);
@ -43,7 +45,7 @@ private:
UInt64 ALWAYS_INLINE readVarint()
{
char c;
in.readStrict(c);
in->readStrict(c);
UInt64 first_byte = static_cast<UInt8>(c);
++cursor;
if (likely(!(c & 0x80)))
@ -56,7 +58,7 @@ private:
void ignoreGroup();
[[noreturn]] void throwUnknownFormat() const;
ReadBuffer & in;
ReadBuffer * in;
Int64 cursor = 0;
bool root_message_has_length_delimiter = false;
size_t current_message_level = 0;

View File

@ -2511,6 +2511,11 @@ namespace
writer->endMessage(/*with_length_delimiter = */ true);
}
void reset() override
{
first_call_of_write_row = true;
}
void readRow(size_t row_num) override
{
if (first_call_of_read_row)

View File

@ -27,6 +27,7 @@ public:
virtual void setColumns(const ColumnPtr * columns, size_t num_columns) = 0;
virtual void writeRow(size_t row_num) = 0;
virtual void finalizeWrite() {}
virtual void reset() {}
virtual void setColumns(const MutableColumnPtr * columns, size_t num_columns) = 0;
virtual void readRow(size_t row_num) = 0;

View File

@ -84,6 +84,13 @@ public:
return res_column;
}
bool hasInformationAboutMonotonicity() const override { return true; }
Monotonicity getMonotonicityForRange(const IDataType & /*type*/, const Field & /*left*/, const Field & /*right*/) const override
{
return {.is_monotonic = true, .is_always_monotonic = true};
}
};

File diff suppressed because it is too large Load Diff

View File

@ -29,7 +29,6 @@ namespace
* several columns to generate a string per row, such as CSV, TSV, JSONEachRow, etc.
* formatRowNoNewline(...) trims the newline character of each row.
*/
template <bool no_newline>
class FunctionFormatRow : public IFunction
{
@ -60,8 +59,20 @@ public:
for (auto i = 1u; i < arguments.size(); ++i)
arg_columns.insert(arguments[i]);
materializeBlockInplace(arg_columns);
auto out = FormatFactory::instance().getOutputFormat(format_name, buffer, arg_columns, context, [&](const Columns &, size_t row)
auto format_settings = getFormatSettings(context);
auto out = FormatFactory::instance().getOutputFormat(format_name, buffer, arg_columns, context, format_settings);
/// This function make sense only for row output formats.
auto * row_output_format = dynamic_cast<IRowOutputFormat *>(out.get());
if (!row_output_format)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot turn rows into a {} format strings. {} function supports only row output formats", format_name, getName());
auto columns = arg_columns.getColumns();
for (size_t i = 0; i != input_rows_count; ++i)
{
row_output_format->writePrefixIfNeeded();
row_output_format->writeRow(columns, i);
row_output_format->finalize();
if constexpr (no_newline)
{
// replace '\n' with '\0'
@ -70,16 +81,11 @@ public:
}
else
writeChar('\0', buffer);
offsets[row] = buffer.count();
});
/// This function make sense only for row output formats.
if (!dynamic_cast<IRowOutputFormat *>(out.get()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot turn rows into a {} format strings. {} function supports only row output formats", format_name, getName());
offsets[i] = buffer.count();
row_output_format->resetFormatter();
}
/// Don't write prefix if any.
out->doNotWritePrefix();
out->write(arg_columns);
return col_str;
}

View File

@ -353,7 +353,7 @@ Typical usage:
{
R"(
Returns a random number from the exponential distribution.
Accepts one parameter.
Accepts one parameter - lambda value.
Typical usage:
[example:typical]
@ -413,7 +413,7 @@ Typical usage:
{
R"(
Returns a random number from the Bernoulli distribution.
Accepts two parameters - probability of success.
Accepts one parameter - probability of success.
Typical usage:
[example:typical]
@ -458,7 +458,7 @@ Typical usage:
{
R"(
Returns a random number from the poisson distribution.
Accepts two parameters - the mean number of occurrences.
Accepts one parameter - the mean number of occurrences.
Typical usage:
[example:typical]

View File

@ -10,12 +10,12 @@ namespace ErrorCodes
}
PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= 0*/)
: BufferWithOwnMemory(start_size_), sub_buf(sub_buf_)
: BufferWithOwnMemory(start_size_), sub_buf(&sub_buf_)
{
padded &= sub_buf.isPadded();
padded &= sub_buf->isPadded();
/// Read from sub-buffer
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
checkStateCorrect();
}
@ -23,17 +23,26 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_
void PeekableReadBuffer::reset()
{
checkStateCorrect();
}
void PeekableReadBuffer::setSubBuffer(ReadBuffer & sub_buf_)
{
sub_buf = &sub_buf_;
resetImpl();
}
void PeekableReadBuffer::resetImpl()
{
peeked_size = 0;
checkpoint = std::nullopt;
checkpoint_in_own_memory = false;
use_stack_memory = true;
if (!currentlyReadFromOwnMemory())
sub_buf.position() = pos;
sub_buf->position() = pos;
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
checkStateCorrect();
}
@ -43,20 +52,20 @@ bool PeekableReadBuffer::peekNext()
checkStateCorrect();
Position copy_from = pos;
size_t bytes_to_copy = sub_buf.available();
size_t bytes_to_copy = sub_buf->available();
if (useSubbufferOnly())
{
/// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer)
if (checkpoint)
copy_from = *checkpoint;
bytes_to_copy = sub_buf.buffer().end() - copy_from;
bytes_to_copy = sub_buf->buffer().end() - copy_from;
if (!bytes_to_copy)
{
sub_buf.position() = copy_from;
sub_buf->position() = copy_from;
/// Both checkpoint and pos are at the end of sub-buffer. Just load next part of data.
bool res = sub_buf.next();
BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
bool res = sub_buf->next();
BufferBase::set(sub_buf->buffer().begin(), sub_buf->buffer().size(), sub_buf->offset());
if (checkpoint)
checkpoint.emplace(pos);
@ -70,13 +79,13 @@ bool PeekableReadBuffer::peekNext()
if (useSubbufferOnly())
{
sub_buf.position() = copy_from;
sub_buf->position() = copy_from;
}
char * memory_data = getMemoryData();
/// Save unread data from sub-buffer to own memory
memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_copy);
memcpy(memory_data + peeked_size, sub_buf->position(), bytes_to_copy);
/// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary
/// Otherwise, checkpoint now at the beginning of own memory
@ -106,10 +115,10 @@ bool PeekableReadBuffer::peekNext()
}
peeked_size += bytes_to_copy;
sub_buf.position() += bytes_to_copy;
sub_buf->position() += bytes_to_copy;
checkStateCorrect();
return sub_buf.next();
return sub_buf->next();
}
void PeekableReadBuffer::rollbackToCheckpoint(bool drop)
@ -152,7 +161,7 @@ bool PeekableReadBuffer::nextImpl()
if (checkpoint)
{
if (currentlyReadFromOwnMemory())
res = sub_buf.hasPendingData() || sub_buf.next();
res = sub_buf->hasPendingData() || sub_buf->next();
else
res = peekNext();
}
@ -161,21 +170,21 @@ bool PeekableReadBuffer::nextImpl()
if (useSubbufferOnly())
{
/// Load next data to sub_buf
sub_buf.position() = position();
res = sub_buf.next();
sub_buf->position() = position();
res = sub_buf->next();
}
else
{
/// All copied data have been read from own memory, continue reading from sub_buf
peeked_size = 0;
res = sub_buf.hasPendingData() || sub_buf.next();
res = sub_buf->hasPendingData() || sub_buf->next();
}
}
/// Switch to reading from sub_buf (or just update it if already switched)
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
nextimpl_working_buffer_offset = sub_buf.offset();
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
nextimpl_working_buffer_offset = sub_buf->offset();
if (checkpoint_at_end)
{
@ -199,8 +208,8 @@ void PeekableReadBuffer::checkStateCorrect() const
throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR);
if (currentlyReadFromOwnMemory() && pos < *checkpoint)
throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR);
if (!currentlyReadFromOwnMemory() && pos < sub_buf.position())
throw DB::Exception("Current position in subbuffer less than sub_buf.position()", ErrorCodes::LOGICAL_ERROR);
if (!currentlyReadFromOwnMemory() && pos < sub_buf->position())
throw DB::Exception("Current position in subbuffer less than sub_buf->position()", ErrorCodes::LOGICAL_ERROR);
}
else
{
@ -294,11 +303,11 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory())
return; /// it's already continuous
size_t bytes_to_append = pos - sub_buf.position();
size_t bytes_to_append = pos - sub_buf->position();
resizeOwnMemoryIfNecessary(bytes_to_append);
char * memory_data = getMemoryData();
memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_append);
sub_buf.position() = pos;
memcpy(memory_data + peeked_size, sub_buf->position(), bytes_to_append);
sub_buf->position() = pos;
peeked_size += bytes_to_append;
BufferBase::set(memory_data, peeked_size, peeked_size);
}
@ -306,7 +315,7 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
PeekableReadBuffer::~PeekableReadBuffer()
{
if (!currentlyReadFromOwnMemory())
sub_buf.position() = pos;
sub_buf->position() = pos;
}
bool PeekableReadBuffer::hasUnreadData() const

View File

@ -24,7 +24,7 @@ public:
~PeekableReadBuffer() override;
void prefetch() override { sub_buf.prefetch(); }
void prefetch() override { sub_buf->prefetch(); }
/// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint()
@ -71,13 +71,17 @@ public:
// without recreating the buffer.
void reset();
void setSubBuffer(ReadBuffer & sub_buf_);
private:
bool nextImpl() override;
void resetImpl();
bool peekNext();
inline bool useSubbufferOnly() const { return !peeked_size; }
inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf.buffer().begin(); }
inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf->buffer().begin(); }
inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; }
void checkStateCorrect() const;
@ -90,7 +94,7 @@ private:
const char * getMemoryData() const { return use_stack_memory ? stack_memory : memory.data(); }
ReadBuffer & sub_buf;
ReadBuffer * sub_buf;
size_t peeked_size = 0;
std::optional<Position> checkpoint = std::nullopt;
bool checkpoint_in_own_memory = false;

View File

@ -26,6 +26,7 @@ template <typename VectorType>
class WriteBufferFromVector : public WriteBuffer
{
public:
using ValueType = typename VectorType::value_type;
explicit WriteBufferFromVector(VectorType & vector_)
: WriteBuffer(reinterpret_cast<Position>(vector_.data()), vector_.size()), vector(vector_)
{
@ -50,9 +51,11 @@ public:
bool isFinished() const { return finalized; }
void restart()
void restart(std::optional<size_t> max_capacity = std::nullopt)
{
if (vector.empty())
if (max_capacity && vector.capacity() > max_capacity)
VectorType(initial_size, ValueType()).swap(vector);
else if (vector.empty())
vector.resize(initial_size);
set(reinterpret_cast<Position>(vector.data()), vector.size());
finalized = false;
@ -68,8 +71,8 @@ private:
{
vector.resize(
((position() - reinterpret_cast<Position>(vector.data())) /// NOLINT
+ sizeof(typename VectorType::value_type) - 1) /// Align up.
/ sizeof(typename VectorType::value_type));
+ sizeof(ValueType) - 1) /// Align up.
/ sizeof(ValueType));
/// Prevent further writes.
set(nullptr, 0);

View File

@ -1216,7 +1216,6 @@ void executeQuery(
compressed_buffer ? *compressed_buffer : *out_buf,
materializeBlock(pipeline.getHeader()),
context,
{},
output_format_settings);
out->setAutoFlush();

View File

@ -34,8 +34,15 @@ MutableColumns StreamingFormatExecutor::getResultColumns()
size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
{
auto & initial_buf = format->getReadBuffer();
format->setReadBuffer(buffer);
return execute();
size_t rows = execute();
/// Format destructor can touch read buffer (for example when we use PeekableReadBuffer),
/// but we cannot control lifetime of provided read buffer. To avoid heap use after free
/// we can set initial read buffer back, because initial read buffer was created before
/// format, so it will be destructed after it.
format->setReadBuffer(initial_buf);
return rows;
}
size_t StreamingFormatExecutor::execute()

View File

@ -2,6 +2,7 @@
#include <Processors/Formats/IInputFormat.h>
#include <Processors/ISimpleTransform.h>
#include <IO/EmptyReadBuffer.h>
namespace DB
{

View File

@ -38,7 +38,7 @@ public:
virtual void resetParser();
virtual void setReadBuffer(ReadBuffer & in_);
const ReadBuffer & getReadBuffer() const { return *in; }
ReadBuffer & getReadBuffer() const { return *in; }
virtual const BlockMissingValues & getMissingValues() const
{

View File

@ -65,7 +65,7 @@ static Chunk prepareTotals(Chunk chunk)
void IOutputFormat::work()
{
writePrefixIfNot();
writePrefixIfNeeded();
if (finished && !finalized)
{
@ -73,6 +73,8 @@ void IOutputFormat::work()
setRowsBeforeLimit(rows_before_limit_counter->get());
finalize();
if (auto_flush)
flush();
return;
}
@ -84,7 +86,7 @@ void IOutputFormat::work()
consume(std::move(current_chunk));
break;
case Totals:
writeSuffixIfNot();
writeSuffixIfNeeded();
if (auto totals = prepareTotals(std::move(current_chunk)))
{
consumeTotals(std::move(totals));
@ -92,7 +94,7 @@ void IOutputFormat::work()
}
break;
case Extremes:
writeSuffixIfNot();
writeSuffixIfNeeded();
consumeExtremes(std::move(current_chunk));
break;
}
@ -110,7 +112,7 @@ void IOutputFormat::flush()
void IOutputFormat::write(const Block & block)
{
writePrefixIfNot();
writePrefixIfNeeded();
consume(Chunk(block.getColumns(), block.rows()));
if (auto_flush)
@ -121,9 +123,10 @@ void IOutputFormat::finalize()
{
if (finalized)
return;
writePrefixIfNot();
writeSuffixIfNot();
writePrefixIfNeeded();
writeSuffixIfNeeded();
finalizeImpl();
finalizeBuffers();
finalized = true;
}

View File

@ -61,13 +61,13 @@ public:
void setTotals(const Block & totals)
{
writeSuffixIfNot();
writeSuffixIfNeeded();
consumeTotals(Chunk(totals.getColumns(), totals.rows()));
are_totals_written = true;
}
void setExtremes(const Block & extremes)
{
writeSuffixIfNot();
writeSuffixIfNeeded();
consumeExtremes(Chunk(extremes.getColumns(), extremes.rows()));
}
@ -76,6 +76,14 @@ public:
void doNotWritePrefix() { need_write_prefix = false; }
void resetFormatter()
{
need_write_prefix = true;
need_write_suffix = true;
finalized = false;
resetFormatterImpl();
}
/// Reset the statistics watch to a specific point in time
/// If set to not running it will stop on the call (elapsed = now() - given start)
void setStartTime(UInt64 start, bool is_running)
@ -85,17 +93,7 @@ public:
statistics.watch.stop();
}
protected:
friend class ParallelFormattingOutputFormat;
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
void writePrefixIfNot()
void writePrefixIfNeeded()
{
if (need_write_prefix)
{
@ -104,7 +102,11 @@ protected:
}
}
void writeSuffixIfNot()
protected:
friend class ParallelFormattingOutputFormat;
void writeSuffixIfNeeded()
{
if (need_write_suffix)
{
@ -113,6 +115,15 @@ protected:
}
}
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void finalizeBuffers() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
virtual void resetFormatterImpl() {}
/// Methods-helpers for parallel formatting.
/// Set the number of rows that was already read in

View File

@ -10,12 +10,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
IRowOutputFormat::IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_)
IRowOutputFormat::IRowOutputFormat(const Block & header, WriteBuffer & out_)
: IOutputFormat(header, out_)
, num_columns(header.columns())
, types(header.getDataTypes())
, serializations(header.getSerializations())
, params(params_)
{
}
@ -26,14 +25,10 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
for (size_t row = 0; row < num_rows; ++row)
{
if (!first_row || getRowsReadBefore() != 0)
if (haveWrittenData())
writeRowBetweenDelimiter();
write(columns, row);
if (params.callback)
params.callback(columns, row);
first_row = false;
}
}

View File

@ -9,14 +9,6 @@
namespace DB
{
struct RowOutputFormatParams
{
using WriteCallback = std::function<void(const Columns & columns,size_t row)>;
// Callback used to indicate that another row is written.
WriteCallback callback;
};
class WriteBuffer;
/** Output format that writes data row by row.
@ -24,10 +16,17 @@ class WriteBuffer;
class IRowOutputFormat : public IOutputFormat
{
public:
using Params = RowOutputFormatParams;
/// Used to work with IRowOutputFormat explicitly.
void writeRow(const Columns & columns, size_t row_num)
{
first_row = false;
write(columns, row_num);
}
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
protected:
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_);
IRowOutputFormat(const Block & header, WriteBuffer & out_);
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
void consumeExtremes(Chunk chunk) override;
@ -51,7 +50,6 @@ protected:
virtual void writeFieldDelimiter() {} /// delimiter between values
virtual void writeRowStartDelimiter() {} /// delimiter before each row
virtual void writeRowEndDelimiter() {} /// delimiter after each row
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
virtual void writePrefix() override {} /// delimiter before resultset
virtual void writeSuffix() override {} /// delimiter after resultset
virtual void writeBeforeTotals() {}
@ -60,10 +58,11 @@ protected:
virtual void writeAfterExtremes() {}
virtual void finalizeImpl() override {} /// Write something after resultset, totals end extremes.
bool haveWrittenData() { return !first_row || getRowsReadBefore() != 0; }
size_t num_columns;
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
};

View File

@ -173,8 +173,9 @@ NamesAndTypesList ArrowSchemaReader::readSchema()
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, stream ? "ArrowStream" : "Arrow", format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference);
return getNamesAndRecursivelyNullableTypes(header);
}
if (format_settings.schema_inference_make_columns_nullable)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();}
void registerInputFormatArrow(FormatFactory & factory)
{
@ -208,12 +209,24 @@ void registerArrowSchemaReader(FormatFactory & factory)
{
return std::make_shared<ArrowSchemaReader>(buf, false, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter("Arrow", [](const FormatSettings & settings)
{
return fmt::format("schema_inference_make_columns_nullable={}", settings.schema_inference_make_columns_nullable);
});
factory.registerSchemaReader(
"ArrowStream",
[](ReadBuffer & buf, const FormatSettings & settings)
{
return std::make_shared<ArrowSchemaReader>(buf, true, settings);
});}
});
factory.registerAdditionalInfoForSchemaCacheGetter("ArrowStream", [](const FormatSettings & settings)
{
return fmt::format("schema_inference_make_columns_nullable={}", settings.schema_inference_make_columns_nullable);
});
}
}
#else

View File

@ -21,7 +21,6 @@ ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block &
: IOutputFormat(header_, out_)
, stream{stream_}
, format_settings{format_settings_}
, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
{
}
@ -65,8 +64,15 @@ void ArrowBlockOutputFormat::finalizeImpl()
"Error while closing a table: {}", status.ToString());
}
void ArrowBlockOutputFormat::resetFormatterImpl()
{
writer.reset();
arrow_ostream.reset();
}
void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema> & schema)
{
arrow_ostream = std::make_shared<ArrowBufferedOutputStream>(out);
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
// TODO: should we use arrow::ipc::IpcOptions::alignment?
@ -88,7 +94,6 @@ void registerOutputFormatArrow(FormatFactory & factory)
"Arrow",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
@ -99,7 +104,6 @@ void registerOutputFormatArrow(FormatFactory & factory)
"ArrowStream",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);

View File

@ -27,6 +27,7 @@ public:
private:
void consume(Chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void prepareWriter(const std::shared_ptr<arrow::Schema> & schema);

View File

@ -906,11 +906,15 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, params_)
, schema_registry(getConfluentSchemaRegistry(format_settings_))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(*in))
, decoder(avro::binaryDecoder())
, format_settings(format_settings_)
{
}
void AvroConfluentRowInputFormat::readPrefix()
{
input_stream = std::make_unique<InputStreamReadBufferAdapter>(*in);
decoder = avro::binaryDecoder();
decoder->init(*input_stream);
}

View File

@ -163,6 +163,7 @@ public:
private:
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;

View File

@ -442,8 +442,8 @@ static avro::Codec getCodec(const std::string & codec_name)
}
AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_)
WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName(), std::make_unique<AvroSerializerTraits>(settings))
{
@ -477,67 +477,24 @@ void AvroRowOutputFormat::write(const Columns & columns, size_t row_num)
file_writer_ptr->incr();
}
void AvroRowOutputFormat::writeSuffix()
void AvroRowOutputFormat::finalizeImpl()
{
file_writer_ptr->close();
}
void AvroRowOutputFormat::resetFormatterImpl()
{
file_writer_ptr.reset();
}
void AvroRowOutputFormat::consume(DB::Chunk chunk)
{
if (params.callback)
consumeImplWithCallback(std::move(chunk));
else
consumeImpl(std::move(chunk));
}
void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows; ++row)
{
write(columns, row);
}
}
void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows;)
{
size_t current_row = row;
/// used by WriteBufferToKafkaProducer to obtain auxiliary data
/// from the starting row of a file
writePrefixIfNot();
for (size_t row_in_file = 0;
row_in_file < settings.avro.output_rows_in_file && row < num_rows;
++row, ++row_in_file)
{
write(columns, row);
}
file_writer_ptr->flush();
writeSuffix();
need_write_prefix = true;
params.callback(columns, current_row);
}
}
void registerOutputFormatAvro(FormatFactory & factory)
{
factory.registerOutputFormat("Avro", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<AvroRowOutputFormat>(buf, sample, settings);
});
factory.markFormatHasNoAppendSupport("Avro");
}

View File

@ -46,27 +46,23 @@ private:
class AvroRowOutputFormat final : public IRowOutputFormat
{
public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_);
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
void consume(Chunk) override;
String getName() const override { return "AvroRowOutputFormat"; }
private:
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
virtual void writePrefix() override;
virtual void writeSuffix() override;
virtual void finalizeImpl() override;
virtual void resetFormatterImpl() override;
void createFileWriter();
FormatSettings settings;
AvroSerializer serializer;
std::unique_ptr<avro::DataFileWriterBase> file_writer_ptr;
void consumeImpl(Chunk);
void consumeImplWithCallback(Chunk);
};
}

View File

@ -43,8 +43,8 @@ static String toValidUTF8String(const String & name)
}
BSONEachRowRowOutputFormat::BSONEachRowRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_), settings(settings_)
WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_), settings(settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
fields.reserve(sample.columns());
@ -519,8 +519,8 @@ void registerOutputFormatBSONEachRow(FormatFactory & factory)
{
factory.registerOutputFormat(
"BSONEachRow",
[](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams & params, const FormatSettings & _format_settings)
{ return std::make_shared<BSONEachRowRowOutputFormat>(buf, sample, params, _format_settings); });
[](WriteBuffer & buf, const Block & sample, const FormatSettings & _format_settings)
{ return std::make_shared<BSONEachRowRowOutputFormat>(buf, sample, _format_settings); });
factory.markOutputFormatSupportsParallelFormatting("BSONEachRow");
}

View File

@ -47,7 +47,7 @@ class BSONEachRowRowOutputFormat final : public IRowOutputFormat
{
public:
BSONEachRowRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_);
WriteBuffer & out_, const Block & header_, const FormatSettings & settings_);
String getName() const override { return "BSONEachRowRowOutputFormat"; }

View File

@ -10,8 +10,8 @@
namespace DB
{
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header, out_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: IRowOutputFormat(header, out_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{
}
@ -55,10 +55,9 @@ void registerOutputFormatRowBinary(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types, params, format_settings);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};

View File

@ -17,7 +17,7 @@ class WriteBuffer;
class BinaryRowOutputFormat final: public IRowOutputFormat
{
public:
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
String getName() const override { return "BinaryRowOutputFormat"; }

View File

@ -9,8 +9,8 @@ namespace DB
{
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
@ -24,11 +24,10 @@ void CSVRowOutputFormat::writeLine(const std::vector<String> & values)
for (size_t i = 0; i < values.size(); ++i)
{
writeCSVString(values[i], out);
if (i + 1 == values.size())
writeRowEndDelimiter();
else
if (i + 1 != values.size())
writeFieldDelimiter();
}
writeRowEndDelimiter();
}
void CSVRowOutputFormat::writePrefix()
@ -36,10 +35,16 @@ void CSVRowOutputFormat::writePrefix()
const auto & sample = getPort(PortKind::Main).getHeader();
if (with_names)
{
writeLine(sample.getNames());
writeRowBetweenDelimiter();
}
if (with_types)
{
writeLine(sample.getDataTypeNames());
writeRowBetweenDelimiter();
}
}
@ -55,21 +60,38 @@ void CSVRowOutputFormat::writeFieldDelimiter()
}
void CSVRowOutputFormat::writeRowEndDelimiter()
void CSVRowOutputFormat::writeRowBetweenDelimiter()
{
if (format_settings.csv.crlf_end_of_line)
writeChar('\r', out);
writeChar('\n', out);
}
void CSVRowOutputFormat::writeSuffix()
{
/// Write '\n' after data if we had any data.
if (haveWrittenData())
writeRowBetweenDelimiter();
}
void CSVRowOutputFormat::writeBeforeTotals()
{
writeChar('\n', out);
writeRowBetweenDelimiter();
}
void CSVRowOutputFormat::writeBeforeExtremes()
{
writeChar('\n', out);
writeRowBetweenDelimiter();
}
void CSVRowOutputFormat::writeAfterTotals()
{
writeRowBetweenDelimiter();
}
void CSVRowOutputFormat::writeAfterExtremes()
{
writeRowBetweenDelimiter();
}
@ -80,10 +102,9 @@ void registerOutputFormatCSV(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, with_types, params, format_settings);
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, with_types, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};

View File

@ -20,7 +20,7 @@ public:
/** with_names - output in the first line a header with column names
* with_types - output in the next line header with the names of the types
*/
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types, const FormatSettings & format_settings_);
String getName() const override { return "CSVRowOutputFormat"; }
@ -33,15 +33,18 @@ public:
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
bool supportTotals() const override { return true; }
bool supportExtremes() const override { return true; }
void writeBeforeTotals() override;
void writeAfterTotals() override;
void writeBeforeExtremes() override;
void writeAfterExtremes() override;
void writePrefix() override;
void writeSuffix() override;
void writeLine(const std::vector<String> & values);
bool with_names;

View File

@ -42,10 +42,9 @@ void CapnProtoOutputStream::write(const void * buffer, size_t size)
CapnProtoRowOutputFormat::CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
: IRowOutputFormat(header_, out_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
{
schema = schema_parser.getMessageSchema(info);
checkCapnProtoSchemaStructure(schema, getPort(PortKind::Main).getHeader(), format_settings.capn_proto.enum_comparing_mode);
@ -264,10 +263,9 @@ void registerOutputFormatCapnProto(FormatFactory & factory)
factory.registerOutputFormat("CapnProto", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, params, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
});
}

View File

@ -29,7 +29,6 @@ public:
CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_);

View File

@ -88,8 +88,7 @@ void CustomSeparatedRowInputFormat::syncAfterError()
void CustomSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}
CustomSeparatedFormatReader::CustomSeparatedFormatReader(

View File

@ -8,8 +8,8 @@ namespace DB
{
CustomSeparatedRowOutputFormat::CustomSeparatedRowOutputFormat(
const Block & header_, WriteBuffer & out_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_, bool with_names_, bool with_types_)
: IRowOutputFormat(header_, out_, params_)
const Block & header_, WriteBuffer & out_, const FormatSettings & format_settings_, bool with_names_, bool with_types_)
: IRowOutputFormat(header_, out_)
, with_names(with_names_)
, with_types(with_types_)
, format_settings(format_settings_)
@ -84,10 +84,9 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<CustomSeparatedRowOutputFormat>(sample, buf, params, settings, with_names, with_types);
return std::make_shared<CustomSeparatedRowOutputFormat>(sample, buf, settings, with_names, with_types);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);

View File

@ -11,7 +11,7 @@ class WriteBuffer;
class CustomSeparatedRowOutputFormat final : public IRowOutputFormat
{
public:
CustomSeparatedRowOutputFormat(const Block & header_, WriteBuffer & out_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_, bool with_names_, bool with_types_);
CustomSeparatedRowOutputFormat(const Block & header_, WriteBuffer & out_, const FormatSettings & format_settings_, bool with_names_, bool with_types_);
String getName() const override { return "CustomSeparatedRowOutputFormat"; }

View File

@ -1,4 +1,5 @@
#include <Processors/Formats/Impl/HiveTextRowInputFormat.h>
#include <Common/assert_cast.h>
#if USE_HIVE
@ -31,12 +32,17 @@ HiveTextRowInputFormat::HiveTextRowInputFormat(
HiveTextRowInputFormat::HiveTextRowInputFormat(
const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_, const FormatSettings & format_settings_)
: CSVRowInputFormat(
header_, *buf_, params_, true, false, format_settings_, std::make_unique<HiveTextFormatReader>(std::move(buf_), format_settings_))
header_, *buf_, params_, true, false, format_settings_, std::make_unique<HiveTextFormatReader>(*buf_, format_settings_)), buf(std::move(buf_))
{
}
HiveTextFormatReader::HiveTextFormatReader(std::unique_ptr<PeekableReadBuffer> buf_, const FormatSettings & format_settings_)
: CSVFormatReader(*buf_, format_settings_), buf(std::move(buf_)), input_field_names(format_settings_.hive_text.input_field_names)
void HiveTextRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
}
HiveTextFormatReader::HiveTextFormatReader(PeekableReadBuffer & buf_, const FormatSettings & format_settings_)
: CSVFormatReader(buf_, format_settings_), buf(&buf_), input_field_names(format_settings_.hive_text.input_field_names)
{
}
@ -53,6 +59,12 @@ std::vector<String> HiveTextFormatReader::readTypes()
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "HiveTextRowInputFormat::readTypes is not implemented");
}
void HiveTextFormatReader::setReadBuffer(ReadBuffer & buf_)
{
buf = assert_cast<PeekableReadBuffer *>(&buf_);
CSVFormatReader::setReadBuffer(buf_);
}
void registerInputFormatHiveText(FormatFactory & factory)
{
factory.registerInputFormat(

View File

@ -18,21 +18,27 @@ public:
String getName() const override { return "HiveTextRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
HiveTextRowInputFormat(
const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_, const FormatSettings & format_settings_);
std::unique_ptr<PeekableReadBuffer> buf;
};
class HiveTextFormatReader final : public CSVFormatReader
{
public:
HiveTextFormatReader(std::unique_ptr<PeekableReadBuffer> buf_, const FormatSettings & format_settings_);
HiveTextFormatReader(PeekableReadBuffer & buf_, const FormatSettings & format_settings_);
std::vector<String> readNames() override;
std::vector<String> readTypes() override;
void setReadBuffer(ReadBuffer & buf_) override;
private:
std::unique_ptr<PeekableReadBuffer> buf;
PeekableReadBuffer * buf;
std::vector<String> input_field_names;
};

View File

@ -98,8 +98,7 @@ bool JSONAsRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}

View File

@ -34,7 +34,6 @@ void registerOutputFormatJSONColumns(FormatFactory & factory)
factory.registerOutputFormat("JSONColumns", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONColumnsBlockOutputFormat>(buf, sample, format_settings, format_settings.json.validate_utf8);

View File

@ -28,7 +28,6 @@ void JSONColumnsBlockOutputFormatBase::consume(Chunk chunk)
void JSONColumnsBlockOutputFormatBase::writeSuffix()
{
writeChunk(mono_chunk);
mono_chunk.clear();
}

View File

@ -91,12 +91,18 @@ void JSONColumnsWithMetadataBlockOutputFormat::finalizeImpl()
ostr->next();
}
void JSONColumnsWithMetadataBlockOutputFormat::resetFormatterImpl()
{
JSONColumnsBlockOutputFormat::resetFormatterImpl();
rows = 0;
statistics = Statistics();
}
void registerOutputFormatJSONColumnsWithMetadata(FormatFactory & factory)
{
factory.registerOutputFormat("JSONColumnsWithMetadata", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONColumnsWithMetadataBlockOutputFormat>(buf, sample, format_settings);

View File

@ -53,6 +53,7 @@ protected:
void writePrefix() override;
void writeSuffix() override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void writeChunkStart() override;
void writeChunkEnd() override;

View File

@ -32,7 +32,6 @@ void registerOutputFormatJSONCompactColumns(FormatFactory & factory)
factory.registerOutputFormat("JSONCompactColumns", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactColumnsBlockOutputFormat>(buf, sample, format_settings);

View File

@ -1,7 +1,6 @@
#include <Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h>
#include <IO/ReadHelpers.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/Operators.h>
#include <Formats/FormatFactory.h>
#include <Formats/verbosePrintString.h>

View File

@ -11,12 +11,11 @@ namespace DB
JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool with_names_,
bool with_types_,
bool yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_, params_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_)
, settings(settings_)
, with_names(with_names_)
, with_types(with_types_)
@ -53,12 +52,17 @@ void JSONCompactEachRowRowOutputFormat::writeRowStartDelimiter()
void JSONCompactEachRowRowOutputFormat::writeRowEndDelimiter()
{
writeCString("]\n", *ostr);
writeChar(']', *ostr);
}
void JSONCompactEachRowRowOutputFormat::writeRowBetweenDelimiter()
{
writeChar('\n', *ostr);
}
void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
writeChar('\n', *ostr);
writeRowBetweenDelimiter();
size_t columns_size = columns.size();
writeRowStartDelimiter();
for (size_t i = 0; i < columns_size; ++i)
@ -69,6 +73,7 @@ void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, siz
writeField(*columns[i], *serializations[i], row_num);
}
writeRowEndDelimiter();
writeRowBetweenDelimiter();
}
void JSONCompactEachRowRowOutputFormat::writeLine(const std::vector<String> & values)
@ -80,7 +85,7 @@ void JSONCompactEachRowRowOutputFormat::writeLine(const std::vector<String> & va
writeChar('\"', *ostr);
writeString(values[i], *ostr);
writeChar('\"', *ostr);
if (i != values.size() - 1)
if (i + 1 != values.size())
writeFieldDelimiter();
}
writeRowEndDelimiter();
@ -91,10 +96,22 @@ void JSONCompactEachRowRowOutputFormat::writePrefix()
const auto & header = getPort(PortKind::Main).getHeader();
if (with_names)
{
writeLine(JSONUtils::makeNamesValidJSONStrings(header.getNames(), settings, settings.json.validate_utf8));
writeRowBetweenDelimiter();
}
if (with_types)
{
writeLine(JSONUtils::makeNamesValidJSONStrings(header.getDataTypeNames(), settings, settings.json.validate_utf8));
writeRowBetweenDelimiter();
}
}
void JSONCompactEachRowRowOutputFormat::writeSuffix()
{
if (haveWrittenData())
writeChar('\n', *ostr);
}
void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
@ -112,10 +129,9 @@ void registerOutputFormatJSONCompactEachRow(FormatFactory & factory)
factory.registerOutputFormat(format_name, [yield_strings, with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, with_names, with_types, yield_strings);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, format_settings, with_names, with_types, yield_strings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);

View File

@ -17,7 +17,6 @@ public:
JSONCompactEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool with_names_,
bool with_types_,
@ -27,6 +26,7 @@ public:
private:
void writePrefix() override;
void writeSuffix() override;
void writeTotals(const Columns & columns, size_t row_num) override;
@ -34,6 +34,7 @@ private:
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
bool supportTotals() const override { return true; }
void consumeTotals(Chunk) override;

View File

@ -11,10 +11,9 @@ namespace DB
JSONCompactRowOutputFormat::JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: JSONRowOutputFormat(out_, header, params_, settings_, yield_strings_)
: JSONRowOutputFormat(out_, header, settings_, yield_strings_)
{
}
@ -72,10 +71,9 @@ void registerOutputFormatJSONCompact(FormatFactory & factory)
factory.registerOutputFormat("JSONCompact", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, false);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompact");
@ -83,10 +81,9 @@ void registerOutputFormatJSONCompact(FormatFactory & factory)
factory.registerOutputFormat("JSONCompactStrings", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, true);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactStrings");

View File

@ -19,7 +19,6 @@ public:
JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);

View File

@ -278,6 +278,7 @@ void JSONEachRowRowInputFormat::resetParser()
read_columns.clear();
seen_columns.clear();
prev_positions.clear();
allow_new_rows = true;
}
void JSONEachRowRowInputFormat::readPrefix()

View File

@ -12,9 +12,8 @@ namespace DB
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_, params_),
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_),
settings(settings_)
{
fields = JSONUtils::makeNamesValidJSONStrings(getPort(PortKind::Main).getHeader().getNames(), settings, settings.json.validate_utf8);
@ -42,49 +41,17 @@ void JSONEachRowRowOutputFormat::writeRowStartDelimiter()
void JSONEachRowRowOutputFormat::writeRowEndDelimiter()
{
// Why do we need this weird `if`?
//
// The reason is the formatRow function that is broken with respect to
// row-between delimiters. It should not write them, but it does, and then
// hacks around it by having a special formatRowNoNewline version, which, as
// you guessed, removes the newline from the end of row. But the row-between
// delimiter goes into a second row, so it turns out to be in the beginning
// of the line, and the removal doesn't work. There is also a second bug --
// the row-between delimiter in this format is written incorrectly. In fact,
// it is not written at all, and the newline is written in a row-end
// delimiter ("}\n" instead of the correct "}"). With these two bugs
// combined, the test 01420_format_row works perfectly.
//
// A proper implementation of formatRow would use IRowOutputFormat directly,
// and not write row-between delimiters, instead of using IOutputFormat
// processor and its crutch row callback. This would require exposing
// IRowOutputFormat, which we don't do now, but which can be generally useful
// for other cases such as parallel formatting, that also require a control
// flow different from the usual IOutputFormat.
//
// I just don't have time or energy to redo all of this, but I need to
// support JSON array output here, which requires proper ",\n" row-between
// delimiters. For compatibility, I preserve the bug in case of non-array
// output.
if (settings.json.array_of_rows)
{
writeChar('}', *ostr);
}
else
{
writeCString("}\n", *ostr);
}
writeCString("}", *ostr);
field_number = 0;
}
void JSONEachRowRowOutputFormat::writeRowBetweenDelimiter()
{
// We preserve an existing bug here for compatibility. See the comment above.
if (settings.json.array_of_rows)
{
writeCString(",\n", *ostr);
}
writeChar(',', *ostr);
writeChar('\n', *ostr);
}
@ -100,9 +67,9 @@ void JSONEachRowRowOutputFormat::writePrefix()
void JSONEachRowRowOutputFormat::writeSuffix()
{
if (settings.json.array_of_rows)
{
writeCString("\n]\n", *ostr);
}
else if (haveWrittenData())
writeChar('\n', *ostr);
}
@ -113,13 +80,11 @@ void registerOutputFormatJSONEachRow(FormatFactory & factory)
factory.registerOutputFormat(format, [serialize_as_strings](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = serialize_as_strings;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting(format);
};

View File

@ -17,7 +17,6 @@ public:
JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_);
String getName() const override { return "JSONEachRowRowOutputFormat"; }

View File

@ -10,13 +10,16 @@ namespace DB
void JSONEachRowWithProgressRowOutputFormat::writeRowStartDelimiter()
{
if (has_progress)
{
writeProgress();
writeRowBetweenDelimiter();
}
writeCString("{\"row\":{", *ostr);
}
void JSONEachRowWithProgressRowOutputFormat::writeRowEndDelimiter()
{
writeCString("}}\n", *ostr);
writeCString("}}", *ostr);
field_number = 0;
}
@ -27,7 +30,7 @@ void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
WriteBufferFromString buf(progress_line);
writeCString("{\"progress\":", buf);
progress.writeJSON(buf);
writeCString("}\n", buf);
writeCString("}", buf);
buf.finalize();
std::lock_guard lock(progress_lines_mutex);
progress_lines.emplace_back(std::move(progress_line));
@ -37,22 +40,33 @@ void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
void JSONEachRowWithProgressRowOutputFormat::flush()
{
if (has_progress)
{
if (haveWrittenData())
writeRowBetweenDelimiter();
writeProgress();
}
JSONEachRowRowOutputFormat::flush();
}
void JSONEachRowWithProgressRowOutputFormat::writeSuffix()
{
if (has_progress)
{
writeRowBetweenDelimiter();
writeProgress();
}
JSONEachRowRowOutputFormat::writeSuffix();
}
void JSONEachRowWithProgressRowOutputFormat::writeProgress()
{
std::lock_guard lock(progress_lines_mutex);
for (const auto & progress_line : progress_lines)
writeString(progress_line, *ostr);
for (size_t i = 0; i != progress_lines.size(); ++i)
{
if (i != 0)
writeRowBetweenDelimiter();
writeString(progress_lines[i], *ostr);
}
progress_lines.clear();
has_progress = false;
}
@ -62,25 +76,21 @@ void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
factory.registerOutputFormat("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, settings);
});
factory.registerOutputFormat("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = true;
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, settings);
});
}

View File

@ -6,8 +6,8 @@
namespace DB
{
JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: JSONEachRowRowOutputFormat(out_, header_, params_, settings_), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, settings_))
JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: JSONEachRowRowOutputFormat(out_, header_, settings_), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, settings_))
{
}
@ -71,12 +71,11 @@ void registerOutputFormatJSONObjectEachRow(FormatFactory & factory)
factory.registerOutputFormat("JSONObjectEachRow", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONObjectEachRowRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<JSONObjectEachRowRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONObjectEachRow");
factory.markFormatHasNoAppendSupport("JSONObjectEachRow");

View File

@ -23,7 +23,6 @@ public:
JSONObjectEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_);
String getName() const override { return "JSONObjectEachRowRowOutputFormat"; }

View File

@ -11,10 +11,9 @@ namespace DB
JSONRowOutputFormat::JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header, out_, params_), settings(settings_), yield_strings(yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header, out_), settings(settings_), yield_strings(yield_strings_)
{
names = JSONUtils::makeNamesValidJSONStrings(header.getNames(), settings, true);
}
@ -126,6 +125,13 @@ void JSONRowOutputFormat::finalizeImpl()
ostr->next();
}
void JSONRowOutputFormat::resetFormatterImpl()
{
RowOutputFormatWithUTF8ValidationAdaptor::resetFormatterImpl();
row_count = 0;
statistics = Statistics();
}
void JSONRowOutputFormat::onProgress(const Progress & value)
{
@ -138,10 +144,9 @@ void registerOutputFormatJSON(FormatFactory & factory)
factory.registerOutputFormat("JSON", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
return std::make_shared<JSONRowOutputFormat>(buf, sample, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSON");
@ -150,10 +155,9 @@ void registerOutputFormatJSON(FormatFactory & factory)
factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, true);
return std::make_shared<JSONRowOutputFormat>(buf, sample, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");

View File

@ -19,7 +19,6 @@ public:
JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);
@ -57,6 +56,7 @@ protected:
void writeAfterExtremes() override;
void finalizeImpl() override;
void resetFormatterImpl() override;
virtual void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);

View File

@ -5,8 +5,8 @@
namespace DB
{
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_) {}
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_) {}
void MarkdownRowOutputFormat::writePrefix()
{
@ -47,7 +47,18 @@ void MarkdownRowOutputFormat::writeFieldDelimiter()
void MarkdownRowOutputFormat::writeRowEndDelimiter()
{
writeCString(" |\n", out);
writeCString(" |", out);
}
void MarkdownRowOutputFormat::writeRowBetweenDelimiter()
{
writeChar('\n', out);
}
void MarkdownRowOutputFormat::writeSuffix()
{
if (haveWrittenData())
writeChar('\n', out);
}
void MarkdownRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
@ -60,10 +71,9 @@ void registerOutputFormatMarkdown(FormatFactory & factory)
factory.registerOutputFormat("Markdown", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Markdown");

View File

@ -12,7 +12,7 @@ class ReadBuffer;
class MarkdownRowOutputFormat final : public IRowOutputFormat
{
public:
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "MarkdownRowOutputFormat"; }
@ -21,6 +21,7 @@ private:
/// |columnName1|columnName2|...|columnNameN|
/// |:-:|:-:|...|:-:|
void writePrefix() override;
void writeSuffix() override;
/// Write '|' before each row
void writeRowStartDelimiter() override;
@ -28,8 +29,11 @@ private:
/// Write '|' between values
void writeFieldDelimiter() override;
/// Write '|\n' after each row
void writeRowEndDelimiter() override ;
/// Write '|' at the end of each row
void writeRowEndDelimiter() override;
/// Write '\n' after each row
void writeRowBetweenDelimiter() override;
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;

View File

@ -429,8 +429,7 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(in_);
buf->setSubBuffer(in_);
}
MsgPackSchemaReader::MsgPackSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)

View File

@ -32,8 +32,8 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), packer(out_), format_settings(format_settings_) {}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), packer(out_), format_settings(format_settings_) {}
void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num)
{
@ -226,10 +226,9 @@ void registerOutputFormatMsgPack(FormatFactory & factory)
factory.registerOutputFormat("MsgPack", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("MsgPack");
}

View File

@ -17,7 +17,7 @@ namespace DB
class MsgPackRowOutputFormat final : public IRowOutputFormat
{
public:
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "MsgPackRowOutputFormat"; }

View File

@ -109,7 +109,6 @@ void registerOutputFormatMySQLWire(FormatFactory & factory)
"MySQLWire",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
}

View File

@ -115,7 +115,6 @@ void registerOutputFormatNative(FormatFactory & factory)
factory.registerOutputFormat("Native", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NativeOutputFormat>(buf, sample);

View File

@ -13,7 +13,6 @@ void registerOutputFormatNull(FormatFactory & factory)
factory.registerOutputFormat("Null", [](
WriteBuffer &,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NullOutputFormat>(sample);

View File

@ -100,7 +100,7 @@ void ODBCDriver2BlockOutputFormat::writePrefix()
void registerOutputFormatODBCDriver2(FormatFactory & factory)
{
factory.registerOutputFormat(
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams &, const FormatSettings & format_settings)
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const FormatSettings & format_settings)
{
return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
});

View File

@ -189,8 +189,9 @@ NamesAndTypesList ORCSchemaReader::readSchema()
getFileReaderAndSchema(in, file_reader, schema, format_settings, is_stopped);
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, "ORC", format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference);
return getNamesAndRecursivelyNullableTypes(header);
}
if (format_settings.schema_inference_make_columns_nullable)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();}
void registerInputFormatORC(FormatFactory & factory)
{
@ -216,6 +217,11 @@ void registerORCSchemaReader(FormatFactory & factory)
return std::make_shared<ORCSchemaReader>(buf, settings);
}
);
factory.registerAdditionalInfoForSchemaCacheGetter("ORC", [](const FormatSettings & settings)
{
return fmt::format("schema_inference_make_columns_nullable={}", settings.schema_inference_make_columns_nullable);
});
}
}

View File

@ -515,6 +515,11 @@ void ORCBlockOutputFormat::finalizeImpl()
writer->close();
}
void ORCBlockOutputFormat::resetFormatterImpl()
{
writer.reset();
}
void ORCBlockOutputFormat::prepareWriter()
{
const Block & header = getPort(PortKind::Main).getHeader();
@ -531,7 +536,6 @@ void registerOutputFormatORC(FormatFactory & factory)
factory.registerOutputFormat("ORC", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);

View File

@ -44,6 +44,7 @@ public:
private:
void consume(Chunk chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
std::unique_ptr<orc::Type> getORCType(const DataTypePtr & type);

View File

@ -20,6 +20,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/**
* ORDER-PRESERVING parallel formatting of data formats.
* The idea is similar to ParallelParsingInputFormat.
@ -167,6 +172,12 @@ private:
void finalizeImpl() override;
void resetFormatterImpl() override
{
/// Resetting parallel formatting is not obvious and it's not used anywhere
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method resetFormatterImpl is not implemented for parallel formatting");
}
InternalFormatterCreator internal_formatter_creator;
/// Status to synchronize multiple threads.

View File

@ -187,7 +187,9 @@ NamesAndTypesList ParquetSchemaReader::readSchema()
getFileReaderAndSchema(in, file_reader, schema, format_settings, is_stopped);
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, "Parquet", format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference);
return getNamesAndRecursivelyNullableTypes(header);
if (format_settings.schema_inference_make_columns_nullable)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();
}
void registerInputFormatParquet(FormatFactory & factory)
@ -214,6 +216,11 @@ void registerParquetSchemaReader(FormatFactory & factory)
return std::make_shared<ParquetSchemaReader>(buf, settings);
}
);
factory.registerAdditionalInfoForSchemaCacheGetter("Parquet", [](const FormatSettings & settings)
{
return fmt::format("schema_inference_make_columns_nullable={}", settings.schema_inference_make_columns_nullable);
});
}
}

View File

@ -74,13 +74,17 @@ void ParquetBlockOutputFormat::finalizeImpl()
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
void ParquetBlockOutputFormat::resetFormatterImpl()
{
file_writer.reset();
}
void registerOutputFormatParquet(FormatFactory & factory)
{
factory.registerOutputFormat(
"Parquet",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);

View File

@ -36,6 +36,7 @@ public:
private:
void consume(Chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
const FormatSettings format_settings;

View File

@ -66,7 +66,6 @@ void registerOutputFormatPostgreSQLWire(FormatFactory & factory)
"PostgreSQLWire",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<PostgreSQLOutputFormat>(buf, sample, settings); });
}
}

View File

@ -54,6 +54,11 @@ protected:
const IColumn & column, const ISerialization & serialization, size_t row_num,
size_t value_width, size_t pad_to_width, bool align_right);
void resetFormatterImpl() override
{
total_rows = 0;
}
private:
bool mono_block;
/// For mono_block == true only
@ -68,7 +73,6 @@ void registerPrettyFormatWithNoEscapesAndMonoBlock(FormatFactory & factory, cons
fact.registerOutputFormat(name, [no_escapes, mono_block](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
if (no_escapes)

View File

@ -82,9 +82,8 @@ static Float64 tryParseFloat(const String & s)
PrometheusTextOutputFormat::PrometheusTextOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_)
: IRowOutputFormat(header_, out_)
, string_serialization(DataTypeString().getDefaultSerialization())
, format_settings(format_settings_)
{
@ -339,10 +338,9 @@ void registerOutputFormatPrometheus(FormatFactory & factory)
factory.registerOutputFormat(FORMAT_NAME, [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<PrometheusTextOutputFormat>(buf, sample, params, settings);
return std::make_shared<PrometheusTextOutputFormat>(buf, sample, settings);
});
}

View File

@ -20,7 +20,6 @@ public:
PrometheusTextOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_);
String getName() const override { return "PrometheusTextOutputFormat"; }

View File

@ -30,6 +30,12 @@ ProtobufListInputFormat::ProtobufListInputFormat(
{
}
void ProtobufListInputFormat::setReadBuffer(ReadBuffer & in_)
{
reader->setReadBuffer(in_);
IRowInputFormat::setReadBuffer(in_);
}
bool ProtobufListInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension)
{
if (reader->eof())

View File

@ -33,6 +33,8 @@ public:
String getName() const override { return "ProtobufListInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override;

View File

@ -13,10 +13,9 @@ namespace DB
ProtobufListOutputFormat::ProtobufListOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_)
: IRowOutputFormat(header_, out_, params_)
: IRowOutputFormat(header_, out_)
, writer(std::make_unique<ProtobufWriter>(out))
, serializer(ProtobufSerializer::create(
header_.getNames(),
@ -42,18 +41,21 @@ void ProtobufListOutputFormat::finalizeImpl()
serializer->finalizeWrite();
}
void ProtobufListOutputFormat::resetFormatterImpl()
{
serializer->reset();
}
void registerOutputFormatProtobufList(FormatFactory & factory)
{
factory.registerOutputFormat(
"ProtobufList",
[](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufListOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings, "Protobuf", true),
buf, header, FormatSchemaInfo(settings, "Protobuf", true),
settings.protobuf.output_nullables_with_google_wrappers);
});
}

Some files were not shown because too many files have changed in this diff Show More