Improve and refactor Kafka/StorageMQ/NATS and data formats

This commit is contained in:
avogar 2022-10-28 16:41:10 +00:00
parent 7980920bd7
commit 8e13d1f1ec
157 changed files with 2770 additions and 1717 deletions

View File

@ -34,7 +34,14 @@ SETTINGS
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
```
Required parameters:
@ -46,13 +53,20 @@ Required parameters:
Optional parameters:
- `kafka_row_delimiter` — Delimiter character, which ends the message.
- `kafka_row_delimiter` — Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `kafka_schema` — Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` — The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll (default: `max_block_size`).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data).
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer (default: `0`). When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block).
- `kafka_num_consumers` — The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed. Default: `1`.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block. Default: `0`.
- `kafka_client_id` — Client identifier. Empty by default.
- `kafka_poll_timeout_ms` — Timeout for single poll from Kafka. Default: [stream_poll_timeout_ms](../../../operations/settings/settings.md#stream_poll_timeout_ms).
- `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
- `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`.
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default, stream.
- `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`.
- `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`.
Examples:
@ -94,7 +108,7 @@ Do not use this method in new projects. If possible, switch old projects to the
``` sql
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
```
</details>
@ -193,6 +207,14 @@ Example:
- `_headers.name` — Array of message's headers keys.
- `_headers.value` — Array of message's headers values.
## Data formats support {#data-formats-support}
Kafka engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one Kafka message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one Kafka message can be controlled by setting `kafka_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
**See Also**
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)

View File

@ -37,8 +37,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[nats_max_block_size = N,]
[nats_flush_interval_ms = N,]
[nats_username = 'user',]
[nats_password = 'password']
[redis_password = 'clickhouse']
[nats_password = 'password',]
[nats_token = 'clickhouse',]
[nats_startup_connect_tries = '5']
[nats_max_rows_per_message = 1]
```
Required parameters:
@ -49,7 +51,7 @@ Required parameters:
Optional parameters:
- `nats_row_delimiter` Delimiter character, which ends the message.
- `nats_row_delimiter` Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `nats_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `nats_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `nats_queue_group` Name for queue group of NATS subscribers. Default is the table name.
@ -57,11 +59,13 @@ Optional parameters:
- `nats_reconnect_wait` Amount of time in milliseconds to sleep between each reconnect attempt. Default: `5000`.
- `nats_server_list` - Server list for connection. Can be specified to connect to NATS cluster.
- `nats_skip_broken_messages` - NATS message parser tolerance to schema-incompatible messages per block. Default: `0`. If `nats_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS.
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS.
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `nats_username` - NATS username.
- `nats_password` - NATS password.
- `nats_token` - NATS auth token.
- `nats_startup_connect_tries` - Number of connect tries at startup. Default: `5`.
- `nats_max_rows_per_message` — The maximum number of rows written in one NATS message for row-based formats. (default : `1`).
SSL connection:
@ -159,6 +163,14 @@ If you want to change the target table by using `ALTER`, we recommend disabling
## Virtual Columns {#virtual-columns}
- `_subject` - NATS message subject.
- `_subject` - NATS message subject.
## Data formats support {#data-formats-support}
NATS engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one NATS message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one NATS message can be controlled by setting `nats_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/nats/) <!--hide-->

View File

@ -37,8 +37,16 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish']
[rabbitmq_flush_interval_ms = N,]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
[rabbitmq_queue_consume = false,]
[rabbitmq_address = '',]
[rabbitmq_vhost = '/',]
[rabbitmq_queue_consume = false,]
[rabbitmq_username = '',]
[rabbitmq_password = '',]
[rabbitmq_commit_on_select = false,]
[rabbitmq_max_rows_per_message = 1]
```
Required parameters:
@ -49,19 +57,27 @@ Required parameters:
Optional parameters:
- `rabbitmq_exchange_type` The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`.
- `rabbitmq_routing_key_list` A comma-separated list of routing keys.
- `rabbitmq_row_delimiter` Delimiter character, which ends the message.
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. Default: `0`. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
- `rabbitmq_queue_settings_list` - allows to set RabbitMQ settings when creating a queue. Available settings: `x-max-length`, `x-max-length-bytes`, `x-message-ttl`, `x-expires`, `x-priority`, `x-max-priority`, `x-overflow`, `x-dead-letter-exchange`, `x-queue-type`. The `durable` setting is enabled automatically for the queue.
- `rabbitmq_exchange_type` The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`.
- `rabbitmq_routing_key_list` A comma-separated list of routing keys.
- `rabbitmq_row_delimiter` Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. Default: `1`
- `rabbitmq_num_queues` Total number of queues. Increasing this number can significantly improve performance. Default: `1`.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `rabbitmq_max_block_size` - Number of row collected before flushing data from RabbitMQ. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `rabbitmq_flush_interval_ms` - Timeout for flushing data from RabbitMQ. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `rabbitmq_queue_settings_list` - allows to set RabbitMQ settings when creating a queue. Available settings: `x-max-length`, `x-max-length-bytes`, `x-message-ttl`, `x-expires`, `x-priority`, `x-max-priority`, `x-overflow`, `x-dead-letter-exchange`, `x-queue-type`. The `durable` setting is enabled automatically for the queue.
- `rabbitmq_address` - Address for connection. Use ether this setting or `rabbitmq_host_port`.
- `rabbitmq_vhost` - RabbitMQ vhost. Default: `'\'`.
- `rabbitmq_queue_consume` - Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings. Default: `false`.
- `rabbitmq_username` - RabbitMQ username.
- `rabbitmq_password` - RabbitMQ password.
- `rabbitmq_commit_on_select` - Commit messages when select query is made. Default: `false`.
- `rabbitmq_max_rows_per_message` — The maximum number of rows written in one RabbitMQ message for row-based formats. Default : `1`.
SSL connection:
@ -166,11 +182,20 @@ Example:
## Virtual Columns {#virtual-columns}
- `_exchange_name` - RabbitMQ exchange name.
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.
- `_exchange_name` - RabbitMQ exchange name.
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.
## Data formats support {#data-formats-support}
RabbitMQ engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one RabbitMQ message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one RabbitMQ message can be controlled by setting `rabbitmq_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/rabbitmq/) <!--hide-->

View File

@ -1011,6 +1011,12 @@ The default value is 7500.
The smaller the value, the more often data is flushed into the table. Setting the value too low leads to poor performance.
## stream_poll_timeout_ms {#stream_poll_timeout_ms}
Timeout for polling data from/to streaming storages.
Default value: 500.
## load_balancing {#settings-load_balancing}
Specifies the algorithm of replicas selection that is used for distributed query processing.

View File

@ -1393,7 +1393,7 @@ formatRow(format, x, y, ...)
**Returned value**
- A formatted string.
- A formatted string. (for text formats it's usually terminated with the new line character).
**Example**
@ -1417,6 +1417,74 @@ Result:
└──────────────────────────────────┘
```
**Note**: If format contains suffix/prefix, it will be written in each row.
**Example**
Query:
``` sql
SELECT formatRow('CustomSeparated', number, 'good')
FROM numbers(3)
SETTINGS format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>'
```
Result:
``` text
┌─formatRow('CustomSeparated', number, 'good')─┐
<prefix>
0 good
<suffix>
<prefix>
1 good
<suffix>
<prefix>
2 good
<suffix>
└──────────────────────────────────────────────┘
```
Note: Only row-based formats are supported in this function.
## formatRowNoNewline
Converts arbitrary expressions into a string via given format. Differs from formatRow in that this function trims the last `\n` if any.
**Syntax**
``` sql
formatRowNoNewline(format, x, y, ...)
```
**Arguments**
- `format` — Text format. For example, [CSV](../../interfaces/formats.md#csv), [TSV](../../interfaces/formats.md#tabseparated).
- `x`,`y`, ... — Expressions.
**Returned value**
- A formatted string.
**Example**
Query:
``` sql
SELECT formatRowNoNewline('CSV', number, 'good')
FROM numbers(3);
```
Result:
``` text
┌─formatRowNoNewline('CSV', number, 'good')─┐
│ 0,"good" │
│ 1,"good" │
│ 2,"good" │
└───────────────────────────────────────────┘
```
## snowflakeToDateTime
Extracts time from [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) as [DateTime](../data-types/datetime.md) format.

View File

@ -1316,7 +1316,7 @@ formatRow(format, x, y, ...)
**Возвращаемое значение**
- Отформатированная строка.
- Отформатированная строка. (в текстовых форматах обычно с завершающим переводом строки).
**Пример**
@ -1340,6 +1340,74 @@ FROM numbers(3);
└──────────────────────────────────┘
```
**Примечание**: если формат содержит префикс/суффикс, то он будет записан в каждой строке.
**Пример**
Запрос:
``` sql
SELECT formatRow('CustomSeparated', number, 'good')
FROM numbers(3)
SETTINGS format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>'
```
Результат:
``` text
┌─formatRow('CustomSeparated', number, 'good')─┐
<prefix>
0 good
<suffix>
<prefix>
1 good
<suffix>
<prefix>
2 good
<suffix>
└──────────────────────────────────────────────┘
```
**Примечание**: данная функция поддерживает только строковые форматы вывода.
## formatRowNoNewline {#formatrownonewline}
Преобразует произвольные выражения в строку заданного формата. Отличается от функции formatRow тем, что удаляет лишний перевод строки `\n` а конце, если он есть.
**Синтаксис**
``` sql
formatRowNoNewline(format, x, y, ...)
```
**Аргументы**
- `format` — текстовый формат. Например, [CSV](../../interfaces/formats.md#csv), [TSV](../../interfaces/formats.md#tabseparated).
- `x`,`y`, ... — выражения.
**Возвращаемое значение**
- Отформатированная строка (в текстовых форматах без завершающего перевода строки).
**Пример**
Запрос:
``` sql
SELECT formatRowNoNewline('CSV', number, 'good')
FROM numbers(3);
```
Результат:
``` text
┌─formatRowNoNewline('CSV', number, 'good')─┐
│ 0,"good" │
│ 1,"good" │
│ 2,"good" │
└───────────────────────────────────────────┘
```
## snowflakeToDateTime {#snowflaketodatetime}
Извлекает время из [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) в формате [DateTime](../data-types/datetime.md).

View File

@ -318,7 +318,6 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_creator;
@ -332,9 +331,9 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
if (settings.output_format_parallel_formatting && getCreators(name).supports_parallel_formatting
&& !settings.output_format_json_array_of_rows)
{
auto formatter_creator = [output_getter, sample, callback, format_settings] (WriteBuffer & output) -> OutputFormatPtr
auto formatter_creator = [output_getter, sample, format_settings] (WriteBuffer & output) -> OutputFormatPtr
{
return output_getter(output, sample, {callback}, format_settings);
return output_getter(output, sample, format_settings);
};
ParallelFormattingOutputFormat::Params builder{buf, sample, formatter_creator, settings.max_threads};
@ -347,7 +346,7 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
return format;
}
return getOutputFormat(name, buf, sample, context, callback, _format_settings);
return getOutputFormat(name, buf, sample, context, _format_settings);
}
@ -356,7 +355,6 @@ OutputFormatPtr FormatFactory::getOutputFormat(
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_creator;
@ -366,15 +364,12 @@ OutputFormatPtr FormatFactory::getOutputFormat(
if (context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Format, name);
RowOutputFormatParams params;
params.callback = std::move(callback);
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
auto format = output_getter(buf, sample, params, format_settings);
auto format = output_getter(buf, sample, format_settings);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
@ -401,9 +396,8 @@ String FormatFactory::getContentType(
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
Block empty_block;
RowOutputFormatParams empty_params;
WriteBufferFromOwnString empty_buffer;
auto format = output_getter(empty_buffer, empty_block, empty_params, format_settings);
auto format = output_getter(empty_buffer, empty_block, format_settings);
return format->getContentType();
}

View File

@ -30,9 +30,9 @@ using ProcessorPtr = std::shared_ptr<IProcessor>;
class IInputFormat;
class IOutputFormat;
class IRowOutputFormat;
struct RowInputFormatParams;
struct RowOutputFormatParams;
class ISchemaReader;
class IExternalSchemaReader;
@ -41,6 +41,7 @@ using ExternalSchemaReaderPtr = std::shared_ptr<IExternalSchemaReader>;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
using RowOutputFormatPtr = std::shared_ptr<IRowOutputFormat>;
template <typename Allocator>
struct Memory;
@ -56,10 +57,6 @@ FormatSettings getFormatSettings(ContextPtr context, const T & settings);
class FormatFactory final : private boost::noncopyable
{
public:
/// This callback allows to perform some additional actions after reading a single row.
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;
/** Fast reading data from buffer and save result to memory.
* Reads at least `min_bytes` and some more until the end of the chunk, depends on the format.
* If `max_rows` is non-zero the function also stops after reading the `max_rows` number of rows
@ -88,7 +85,6 @@ private:
using OutputCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)>;
/// Some input formats can have non trivial readPrefix() and readSuffix(),
@ -152,7 +148,6 @@ public:
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
OutputFormatPtr getOutputFormat(
@ -160,7 +155,6 @@ public:
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & _format_settings = std::nullopt) const;
String getContentType(

View File

@ -37,7 +37,7 @@ namespace
ProtobufReader::ProtobufReader(ReadBuffer & in_)
: in(in_)
: in(&in_)
{
}
@ -153,7 +153,7 @@ bool ProtobufReader::readFieldNumber(int & field_number_)
{
if (current_message_end == END_OF_FILE)
{
if (unlikely(in.eof()))
if (unlikely(in->eof()))
{
current_message_end = cursor;
return false;
@ -282,26 +282,26 @@ void ProtobufReader::readStringAndAppend(PaddedPODArray<UInt8> & str)
void ProtobufReader::readBinary(void* data, size_t size)
{
in.readStrict(reinterpret_cast<char*>(data), size);
in->readStrict(reinterpret_cast<char*>(data), size);
cursor += size;
}
void ProtobufReader::ignore(UInt64 num_bytes)
{
in.ignore(num_bytes);
in->ignore(num_bytes);
cursor += num_bytes;
}
void ProtobufReader::ignoreAll()
{
cursor += in.tryIgnore(std::numeric_limits<size_t>::max());
cursor += in->tryIgnore(std::numeric_limits<size_t>::max());
}
void ProtobufReader::moveCursorBackward(UInt64 num_bytes)
{
if (in.offset() < num_bytes)
if (in->offset() < num_bytes)
throwUnknownFormat();
in.position() -= num_bytes;
in->position() -= num_bytes;
cursor -= num_bytes;
}
@ -313,7 +313,7 @@ UInt64 ProtobufReader::continueReadingVarint(UInt64 first_byte)
# define PROTOBUF_READER_READ_VARINT_BYTE(byteNo) \
do \
{ \
in.readStrict(c); \
in->readStrict(c); \
++cursor; \
if constexpr ((byteNo) < 10) \
{ \
@ -352,7 +352,7 @@ void ProtobufReader::ignoreVarint()
# define PROTOBUF_READER_IGNORE_VARINT_BYTE(byteNo) \
do \
{ \
in.readStrict(c); \
in->readStrict(c); \
++cursor; \
if constexpr ((byteNo) < 10) \
{ \

View File

@ -32,7 +32,9 @@ public:
void readString(String & str);
void readStringAndAppend(PaddedPODArray<UInt8> & str);
bool eof() const { return in.eof(); }
bool eof() const { return in->eof(); }
void setReadBuffer(ReadBuffer & in_) { in = &in_; }
private:
void readBinary(void * data, size_t size);
@ -43,7 +45,7 @@ private:
UInt64 ALWAYS_INLINE readVarint()
{
char c;
in.readStrict(c);
in->readStrict(c);
UInt64 first_byte = static_cast<UInt8>(c);
++cursor;
if (likely(!(c & 0x80)))
@ -56,7 +58,7 @@ private:
void ignoreGroup();
[[noreturn]] void throwUnknownFormat() const;
ReadBuffer & in;
ReadBuffer * in;
Int64 cursor = 0;
bool root_message_has_length_delimiter = false;
size_t current_message_level = 0;

View File

@ -2508,6 +2508,11 @@ namespace
writer->endMessage(/*with_length_delimiter = */ true);
}
void reset() override
{
first_call_of_write_row = true;
}
void readRow(size_t row_num) override
{
if (first_call_of_read_row)

View File

@ -27,6 +27,7 @@ public:
virtual void setColumns(const ColumnPtr * columns, size_t num_columns) = 0;
virtual void writeRow(size_t row_num) = 0;
virtual void finalizeWrite() {}
virtual void reset() {}
virtual void setColumns(const MutableColumnPtr * columns, size_t num_columns) = 0;
virtual void readRow(size_t row_num) = 0;

View File

@ -27,23 +27,18 @@ namespace
/** formatRow(<format>, x, y, ...) is a function that allows you to use RowOutputFormat over
* several columns to generate a string per row, such as CSV, TSV, JSONEachRow, etc.
* formatRowNoNewline(...) trims the newline character of each row.
*/
template <bool no_newline>
class FunctionFormatRow : public IFunction
{
public:
static constexpr auto name = "formatRow";
static constexpr auto name = no_newline ? "formatRowNoNewline" : "formatRow";
FunctionFormatRow(const String & format_name_, ContextPtr context_) : format_name(format_name_), context(context_)
{
if (!FormatFactory::instance().getAllFormats().contains(format_name))
throw Exception("Unknown format " + format_name, ErrorCodes::UNKNOWN_FORMAT);
/// It's impossible to output separate rows in Avro format, because of specific
/// implementation (we cannot separate table schema and rows, rows are written
/// in our buffer in batches)
if (format_name == "Avro")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format Avro is not supported in function {}");
}
String getName() const override { return name; }
@ -65,24 +60,30 @@ public:
arg_columns.insert(arguments[i]);
materializeBlockInplace(arg_columns);
auto format_settings = getFormatSettings(context);
/// For SQLInsert output format we should set max_batch_size settings to 1 so
/// each line will contain prefix INSERT INTO ... (otherwise only subset of columns
/// will contain it according to max_batch_size setting)
format_settings.sql_insert.max_batch_size = 1;
auto out = FormatFactory::instance().getOutputFormat(format_name, buffer, arg_columns, context, {}, format_settings);
auto out = FormatFactory::instance().getOutputFormat(format_name, buffer, arg_columns, context, format_settings);
/// This function make sense only for row output formats.
auto * row_output_format = dynamic_cast<IRowOutputFormat *>(out.get());
if (!row_output_format)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot turn rows into a {} format strings. {} function supports only row output formats", format_name, getName());
auto & working_buf = row_output_format->getWriteBuffer();
auto columns = arg_columns.getColumns();
for (size_t i = 0; i != input_rows_count; ++i)
{
row_output_format->write(columns, i);
writeChar('\0', working_buf);
offsets[i] = working_buf.count();
row_output_format->writePrefixIfNot();
row_output_format->writeRow(columns, i);
row_output_format->finalize();
if constexpr (no_newline)
{
// replace '\n' with '\0'
if (buffer.position() != buffer.buffer().begin() && buffer.position()[-1] == '\n')
buffer.position()[-1] = '\0';
}
else
writeChar('\0', buffer);
offsets[i] = buffer.count();
row_output_format->resetFormatter();
}
return col_str;
@ -93,10 +94,11 @@ private:
ContextPtr context;
};
template <bool no_newline>
class FormatRowOverloadResolver : public IFunctionOverloadResolver
{
public:
static constexpr auto name = "formatRow";
static constexpr auto name = no_newline ? "formatRowNoNewline" : "formatRow";
static FunctionOverloadResolverPtr create(ContextPtr context) { return std::make_unique<FormatRowOverloadResolver>(context); }
explicit FormatRowOverloadResolver(ContextPtr context_) : context(context_) { }
String getName() const override { return name; }
@ -114,7 +116,7 @@ public:
if (const auto * name_col = checkAndGetColumnConst<ColumnString>(arguments.at(0).column.get()))
return std::make_unique<FunctionToFunctionBaseAdaptor>(
std::make_shared<FunctionFormatRow>(name_col->getValue<String>(), context),
std::make_shared<FunctionFormatRow<no_newline>>(name_col->getValue<String>(), context),
collections::map<DataTypes>(arguments, [](const auto & elem) { return elem.type; }),
return_type);
else
@ -131,7 +133,8 @@ private:
REGISTER_FUNCTION(FormatRow)
{
factory.registerFunction<FormatRowOverloadResolver>();
factory.registerFunction<FormatRowOverloadResolver<true>>();
factory.registerFunction<FormatRowOverloadResolver<false>>();
}
}

View File

@ -26,6 +26,7 @@ template <typename VectorType>
class WriteBufferFromVector : public WriteBuffer
{
public:
using ValueType = typename VectorType::value_type;
explicit WriteBufferFromVector(VectorType & vector_)
: WriteBuffer(reinterpret_cast<Position>(vector_.data()), vector_.size()), vector(vector_)
{
@ -50,9 +51,11 @@ public:
bool isFinished() const { return finalized; }
void restart()
void restart(std::optional<size_t> max_capacity = std::nullopt)
{
if (vector.empty())
if (max_capacity && vector.capacity() > max_capacity)
VectorType(initial_size, ValueType()).swap(vector);
else if (vector.empty())
vector.resize(initial_size);
set(reinterpret_cast<Position>(vector.data()), vector.size());
finalized = false;
@ -68,8 +71,8 @@ private:
{
vector.resize(
((position() - reinterpret_cast<Position>(vector.data())) /// NOLINT
+ sizeof(typename VectorType::value_type) - 1) /// Align up.
/ sizeof(typename VectorType::value_type));
+ sizeof(ValueType) - 1) /// Align up.
/ sizeof(ValueType));
/// Prevent further writes.
set(nullptr, 0);

View File

@ -1199,7 +1199,6 @@ void executeQuery(
compressed_buffer ? *compressed_buffer : *out_buf,
materializeBlock(pipeline.getHeader()),
context,
{},
output_format_settings);
out->setAutoFlush();

View File

@ -126,6 +126,7 @@ void IOutputFormat::finalize()
writePrefixIfNot();
writeSuffixIfNot();
finalizeImpl();
finalizeBuffers();
finalized = true;
}

View File

@ -76,17 +76,13 @@ public:
void doNotWritePrefix() { need_write_prefix = false; }
virtual WriteBuffer & getWriteBuffer() const { return out; }
protected:
friend class ParallelFormattingOutputFormat;
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
void resetFormatter()
{
need_write_prefix = true;
need_write_suffix = true;
finalized = false;
resetFormatterImpl();
}
void writePrefixIfNot()
{
@ -97,6 +93,9 @@ protected:
}
}
protected:
friend class ParallelFormattingOutputFormat;
void writeSuffixIfNot()
{
if (need_write_suffix)
@ -106,6 +105,15 @@ protected:
}
}
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void finalizeBuffers() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
virtual void resetFormatterImpl() {}
/// Methods-helpers for parallel formatting.
/// Set the number of rows that was already read in

View File

@ -10,12 +10,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
IRowOutputFormat::IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_)
IRowOutputFormat::IRowOutputFormat(const Block & header, WriteBuffer & out_)
: IOutputFormat(header, out_)
, num_columns(header.columns())
, types(header.getDataTypes())
, serializations(header.getSerializations())
, params(params_)
{
}
@ -30,10 +29,6 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
writeRowBetweenDelimiter();
write(columns, row);
if (params.callback)
params.callback(columns, row);
first_row = false;
}
}

View File

@ -9,14 +9,6 @@
namespace DB
{
struct RowOutputFormatParams
{
using WriteCallback = std::function<void(const Columns & columns,size_t row)>;
// Callback used to indicate that another row is written.
WriteCallback callback;
};
class WriteBuffer;
/** Output format that writes data row by row.
@ -24,16 +16,17 @@ class WriteBuffer;
class IRowOutputFormat : public IOutputFormat
{
public:
using Params = RowOutputFormatParams;
/// Used to work with IRowOutputFormat explicitly.
void writeRow(const Columns & columns, size_t row_num)
{
first_row = false;
write(columns, row_num);
}
/** Write a row.
* Default implementation calls methods to write single values and delimiters
* (except delimiter between rows (writeRowBetweenDelimiter())).
*/
virtual void write(const Columns & columns, size_t row_num);
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
protected:
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_);
IRowOutputFormat(const Block & header, WriteBuffer & out_);
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
void consumeExtremes(Chunk chunk) override;
@ -41,6 +34,11 @@ protected:
virtual bool supportTotals() const { return false; }
virtual bool supportExtremes() const { return false; }
/** Write a row.
* Default implementation calls methods to write single values and delimiters
* (except delimiter between rows (writeRowBetweenDelimiter())).
*/
virtual void write(const Columns & columns, size_t row_num);
virtual void writeMinExtreme(const Columns & columns, size_t row_num);
virtual void writeMaxExtreme(const Columns & columns, size_t row_num);
virtual void writeTotals(const Columns & columns, size_t row_num);
@ -52,7 +50,6 @@ protected:
virtual void writeFieldDelimiter() {} /// delimiter between values
virtual void writeRowStartDelimiter() {} /// delimiter before each row
virtual void writeRowEndDelimiter() {} /// delimiter after each row
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
virtual void writePrefix() override {} /// delimiter before resultset
virtual void writeSuffix() override {} /// delimiter after resultset
virtual void writeBeforeTotals() {}
@ -66,7 +63,6 @@ protected:
size_t num_columns;
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
};

View File

@ -21,7 +21,6 @@ ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block &
: IOutputFormat(header_, out_)
, stream{stream_}
, format_settings{format_settings_}
, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
{
}
@ -65,8 +64,15 @@ void ArrowBlockOutputFormat::finalizeImpl()
"Error while closing a table: {}", status.ToString());
}
void ArrowBlockOutputFormat::resetFormatterImpl()
{
writer.reset();
arrow_ostream.reset();
}
void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema> & schema)
{
arrow_ostream = std::make_shared<ArrowBufferedOutputStream>(out);
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
// TODO: should we use arrow::ipc::IpcOptions::alignment?
@ -88,7 +94,6 @@ void registerOutputFormatArrow(FormatFactory & factory)
"Arrow",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
@ -99,7 +104,6 @@ void registerOutputFormatArrow(FormatFactory & factory)
"ArrowStream",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);

View File

@ -27,6 +27,7 @@ public:
private:
void consume(Chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void prepareWriter(const std::shared_ptr<arrow::Schema> & schema);

View File

@ -902,11 +902,15 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, params_)
, schema_registry(getConfluentSchemaRegistry(format_settings_))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(*in))
, decoder(avro::binaryDecoder())
, format_settings(format_settings_)
{
}
void AvroConfluentRowInputFormat::readPrefix()
{
input_stream = std::make_unique<InputStreamReadBufferAdapter>(*in);
decoder = avro::binaryDecoder();
decoder->init(*input_stream);
}

View File

@ -163,6 +163,7 @@ public:
private:
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;

View File

@ -436,8 +436,8 @@ static avro::Codec getCodec(const std::string & codec_name)
}
AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_)
WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName(), std::make_unique<AvroSerializerTraits>(settings))
{
@ -471,67 +471,24 @@ void AvroRowOutputFormat::write(const Columns & columns, size_t row_num)
file_writer_ptr->incr();
}
void AvroRowOutputFormat::writeSuffix()
void AvroRowOutputFormat::finalizeImpl()
{
file_writer_ptr->close();
}
void AvroRowOutputFormat::resetFormatterImpl()
{
file_writer_ptr.reset();
}
void AvroRowOutputFormat::consume(DB::Chunk chunk)
{
if (params.callback)
consumeImplWithCallback(std::move(chunk));
else
consumeImpl(std::move(chunk));
}
void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows; ++row)
{
write(columns, row);
}
}
void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows;)
{
size_t current_row = row;
/// used by WriteBufferToKafkaProducer to obtain auxiliary data
/// from the starting row of a file
writePrefixIfNot();
for (size_t row_in_file = 0;
row_in_file < settings.avro.output_rows_in_file && row < num_rows;
++row, ++row_in_file)
{
write(columns, row);
}
file_writer_ptr->flush();
writeSuffix();
need_write_prefix = true;
params.callback(columns, current_row);
}
}
void registerOutputFormatAvro(FormatFactory & factory)
{
factory.registerOutputFormat("Avro", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<AvroRowOutputFormat>(buf, sample, settings);
});
factory.markFormatHasNoAppendSupport("Avro");
}

View File

@ -46,27 +46,23 @@ private:
class AvroRowOutputFormat final : public IRowOutputFormat
{
public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_);
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
void consume(Chunk) override;
String getName() const override { return "AvroRowOutputFormat"; }
private:
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
virtual void writePrefix() override;
virtual void writeSuffix() override;
virtual void finalizeImpl() override;
virtual void resetFormatterImpl() override;
void createFileWriter();
FormatSettings settings;
AvroSerializer serializer;
std::unique_ptr<avro::DataFileWriterBase> file_writer_ptr;
void consumeImpl(Chunk);
void consumeImplWithCallback(Chunk);
};
}

View File

@ -10,8 +10,8 @@
namespace DB
{
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params_)
: IRowOutputFormat(header, out_, params_), with_names(with_names_), with_types(with_types_)
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_)
: IRowOutputFormat(header, out_), with_names(with_names_), with_types(with_types_)
{
}
@ -55,10 +55,9 @@ void registerOutputFormatRowBinary(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types, params);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};

View File

@ -17,7 +17,7 @@ class WriteBuffer;
class BinaryRowOutputFormat final: public IRowOutputFormat
{
public:
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params_);
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_);
String getName() const override { return "BinaryRowOutputFormat"; }

View File

@ -9,8 +9,8 @@ namespace DB
{
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
@ -102,10 +102,9 @@ void registerOutputFormatCSV(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, with_types, params, format_settings);
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, with_types, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};

View File

@ -20,7 +20,7 @@ public:
/** with_names - output in the first line a header with column names
* with_types - output in the next line header with the names of the types
*/
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types, const FormatSettings & format_settings_);
String getName() const override { return "CSVRowOutputFormat"; }

View File

@ -42,10 +42,9 @@ void CapnProtoOutputStream::write(const void * buffer, size_t size)
CapnProtoRowOutputFormat::CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
: IRowOutputFormat(header_, out_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
{
schema = schema_parser.getMessageSchema(info);
checkCapnProtoSchemaStructure(schema, getPort(PortKind::Main).getHeader(), format_settings.capn_proto.enum_comparing_mode);
@ -248,10 +247,9 @@ void registerOutputFormatCapnProto(FormatFactory & factory)
factory.registerOutputFormat("CapnProto", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, params, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
});
}

View File

@ -29,7 +29,6 @@ public:
CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_);

View File

@ -8,8 +8,8 @@ namespace DB
{
CustomSeparatedRowOutputFormat::CustomSeparatedRowOutputFormat(
const Block & header_, WriteBuffer & out_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_, bool with_names_, bool with_types_)
: IRowOutputFormat(header_, out_, params_)
const Block & header_, WriteBuffer & out_, const FormatSettings & format_settings_, bool with_names_, bool with_types_)
: IRowOutputFormat(header_, out_)
, with_names(with_names_)
, with_types(with_types_)
, format_settings(format_settings_)
@ -84,10 +84,9 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<CustomSeparatedRowOutputFormat>(sample, buf, params, settings, with_names, with_types);
return std::make_shared<CustomSeparatedRowOutputFormat>(sample, buf, settings, with_names, with_types);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);

View File

@ -11,7 +11,7 @@ class WriteBuffer;
class CustomSeparatedRowOutputFormat final : public IRowOutputFormat
{
public:
CustomSeparatedRowOutputFormat(const Block & header_, WriteBuffer & out_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_, bool with_names_, bool with_types_);
CustomSeparatedRowOutputFormat(const Block & header_, WriteBuffer & out_, const FormatSettings & format_settings_, bool with_names_, bool with_types_);
String getName() const override { return "CustomSeparatedRowOutputFormat"; }

View File

@ -1,4 +1,5 @@
#include <Processors/Formats/Impl/HiveTextRowInputFormat.h>
#include <Common/assert_cast.h>
#if USE_HIVE
@ -31,12 +32,18 @@ HiveTextRowInputFormat::HiveTextRowInputFormat(
HiveTextRowInputFormat::HiveTextRowInputFormat(
const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_, const FormatSettings & format_settings_)
: CSVRowInputFormat(
header_, *buf_, params_, true, false, format_settings_, std::make_unique<HiveTextFormatReader>(std::move(buf_), format_settings_))
header_, *buf_, params_, true, false, format_settings_, std::make_unique<HiveTextFormatReader>(*buf_, format_settings_)), buf(std::move(buf_))
{
}
HiveTextFormatReader::HiveTextFormatReader(std::unique_ptr<PeekableReadBuffer> buf_, const FormatSettings & format_settings_)
: CSVFormatReader(*buf_, format_settings_), buf(std::move(buf_)), input_field_names(format_settings_.hive_text.input_field_names)
void HiveTextRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
CSVRowInputFormat::setReadBuffer(*buf);
}
HiveTextFormatReader::HiveTextFormatReader(PeekableReadBuffer & buf_, const FormatSettings & format_settings_)
: CSVFormatReader(buf_, format_settings_), buf(&buf_), input_field_names(format_settings_.hive_text.input_field_names)
{
}
@ -53,6 +60,12 @@ std::vector<String> HiveTextFormatReader::readTypes()
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "HiveTextRowInputFormat::readTypes is not implemented");
}
void HiveTextFormatReader::setReadBuffer(ReadBuffer & buf_)
{
buf = assert_cast<PeekableReadBuffer *>(&buf_);
CSVFormatReader::setReadBuffer(buf_);
}
void registerInputFormatHiveText(FormatFactory & factory)
{
factory.registerInputFormat(

View File

@ -18,21 +18,27 @@ public:
String getName() const override { return "HiveTextRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
HiveTextRowInputFormat(
const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_, const FormatSettings & format_settings_);
std::unique_ptr<PeekableReadBuffer> buf;
};
class HiveTextFormatReader final : public CSVFormatReader
{
public:
HiveTextFormatReader(std::unique_ptr<PeekableReadBuffer> buf_, const FormatSettings & format_settings_);
HiveTextFormatReader(PeekableReadBuffer & buf_, const FormatSettings & format_settings_);
std::vector<String> readNames() override;
std::vector<String> readTypes() override;
void setReadBuffer(ReadBuffer & buf_) override;
private:
std::unique_ptr<PeekableReadBuffer> buf;
PeekableReadBuffer * buf;
std::vector<String> input_field_names;
};

View File

@ -34,7 +34,6 @@ void registerOutputFormatJSONColumns(FormatFactory & factory)
factory.registerOutputFormat("JSONColumns", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONColumnsBlockOutputFormat>(buf, sample, format_settings, format_settings.json.validate_utf8);

View File

@ -28,7 +28,6 @@ void JSONColumnsBlockOutputFormatBase::consume(Chunk chunk)
void JSONColumnsBlockOutputFormatBase::writeSuffix()
{
writeChunk(mono_chunk);
mono_chunk.clear();
}

View File

@ -95,12 +95,18 @@ void JSONColumnsWithMetadataBlockOutputFormat::finalizeImpl()
ostr->next();
}
void JSONColumnsWithMetadataBlockOutputFormat::resetFormatterImpl()
{
JSONColumnsBlockOutputFormat::resetFormatterImpl();
rows = 0;
statistics = Statistics();
}
void registerOutputFormatJSONColumnsWithMetadata(FormatFactory & factory)
{
factory.registerOutputFormat("JSONColumnsWithMetadata", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONColumnsWithMetadataBlockOutputFormat>(buf, sample, format_settings);

View File

@ -46,8 +46,6 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit_) override { statistics.rows_before_limit = rows_before_limit_; statistics.applied_limit = true; }
void onProgress(const Progress & progress_) override { statistics.progress.incrementPiecewiseAtomically(progress_); }
WriteBuffer & getWriteBuffer() const override { return *ostr; }
protected:
void consumeTotals(Chunk chunk) override;
void consumeExtremes(Chunk chunk) override;
@ -55,6 +53,7 @@ protected:
void writePrefix() override;
void writeSuffix() override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void writeChunkStart() override;
void writeChunkEnd() override;

View File

@ -32,7 +32,6 @@ void registerOutputFormatJSONCompactColumns(FormatFactory & factory)
factory.registerOutputFormat("JSONCompactColumns", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactColumnsBlockOutputFormat>(buf, sample, format_settings);

View File

@ -1,7 +1,6 @@
#include <Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h>
#include <IO/ReadHelpers.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/Operators.h>
#include <Formats/FormatFactory.h>
#include <Formats/verbosePrintString.h>

View File

@ -11,12 +11,11 @@ namespace DB
JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool with_names_,
bool with_types_,
bool yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_, params_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_)
, settings(settings_)
, with_names(with_names_)
, with_types(with_types_)
@ -130,10 +129,9 @@ void registerOutputFormatJSONCompactEachRow(FormatFactory & factory)
factory.registerOutputFormat(format_name, [yield_strings, with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, with_names, with_types, yield_strings);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, format_settings, with_names, with_types, yield_strings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);

View File

@ -17,7 +17,6 @@ public:
JSONCompactEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool with_names_,
bool with_types_,

View File

@ -11,10 +11,9 @@ namespace DB
JSONCompactRowOutputFormat::JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: JSONRowOutputFormat(out_, header, params_, settings_, yield_strings_)
: JSONRowOutputFormat(out_, header, settings_, yield_strings_)
{
}
@ -72,10 +71,9 @@ void registerOutputFormatJSONCompact(FormatFactory & factory)
factory.registerOutputFormat("JSONCompact", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, false);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompact");
@ -83,10 +81,9 @@ void registerOutputFormatJSONCompact(FormatFactory & factory)
factory.registerOutputFormat("JSONCompactStrings", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, true);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactStrings");

View File

@ -19,7 +19,6 @@ public:
JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);

View File

@ -277,6 +277,7 @@ void JSONEachRowRowInputFormat::resetParser()
read_columns.clear();
seen_columns.clear();
prev_positions.clear();
allow_new_rows = true;
}
void JSONEachRowRowInputFormat::readPrefix()

View File

@ -12,9 +12,8 @@ namespace DB
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_, params_),
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_),
settings(settings_)
{
fields = JSONUtils::makeNamesValidJSONStrings(getPort(PortKind::Main).getHeader().getNames(), settings, settings.json.validate_utf8);
@ -81,13 +80,11 @@ void registerOutputFormatJSONEachRow(FormatFactory & factory)
factory.registerOutputFormat(format, [serialize_as_strings](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = serialize_as_strings;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting(format);
};

View File

@ -17,7 +17,6 @@ public:
JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_);
String getName() const override { return "JSONEachRowRowOutputFormat"; }

View File

@ -76,25 +76,21 @@ void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
factory.registerOutputFormat("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, settings);
});
factory.registerOutputFormat("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = true;
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, settings);
});
}

View File

@ -6,8 +6,8 @@
namespace DB
{
JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: JSONEachRowRowOutputFormat(out_, header_, params_, settings_), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, settings_))
JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: JSONEachRowRowOutputFormat(out_, header_, settings_), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, settings_))
{
}
@ -71,12 +71,11 @@ void registerOutputFormatJSONObjectEachRow(FormatFactory & factory)
factory.registerOutputFormat("JSONObjectEachRow", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONObjectEachRowRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<JSONObjectEachRowRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONObjectEachRow");
factory.markFormatHasNoAppendSupport("JSONObjectEachRow");

View File

@ -23,7 +23,6 @@ public:
JSONObjectEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_);
String getName() const override { return "JSONObjectEachRowRowOutputFormat"; }

View File

@ -11,10 +11,9 @@ namespace DB
JSONRowOutputFormat::JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header, out_, params_), settings(settings_), yield_strings(yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header, out_), settings(settings_), yield_strings(yield_strings_)
{
names = JSONUtils::makeNamesValidJSONStrings(header.getNames(), settings, true);
}
@ -130,6 +129,13 @@ void JSONRowOutputFormat::finalizeImpl()
ostr->next();
}
void JSONRowOutputFormat::resetFormatterImpl()
{
RowOutputFormatWithUTF8ValidationAdaptor::resetFormatterImpl();
row_count = 0;
statistics = Statistics();
}
void JSONRowOutputFormat::onProgress(const Progress & value)
{
@ -142,10 +148,9 @@ void registerOutputFormatJSON(FormatFactory & factory)
factory.registerOutputFormat("JSON", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
return std::make_shared<JSONRowOutputFormat>(buf, sample, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSON");
@ -154,10 +159,9 @@ void registerOutputFormatJSON(FormatFactory & factory)
factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, true);
return std::make_shared<JSONRowOutputFormat>(buf, sample, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");

View File

@ -19,7 +19,6 @@ public:
JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);
@ -35,8 +34,6 @@ public:
statistics.rows_before_limit = rows_before_limit_;
}
WriteBuffer & getWriteBuffer() const override { return *ostr; }
protected:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
@ -59,6 +56,7 @@ protected:
void writeAfterExtremes() override;
void finalizeImpl() override;
void resetFormatterImpl() override;
virtual void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);

View File

@ -5,8 +5,8 @@
namespace DB
{
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_) {}
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_) {}
void MarkdownRowOutputFormat::writePrefix()
{
@ -71,10 +71,9 @@ void registerOutputFormatMarkdown(FormatFactory & factory)
factory.registerOutputFormat("Markdown", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Markdown");

View File

@ -12,7 +12,7 @@ class ReadBuffer;
class MarkdownRowOutputFormat final : public IRowOutputFormat
{
public:
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "MarkdownRowOutputFormat"; }

View File

@ -32,8 +32,8 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), packer(out_), format_settings(format_settings_) {}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), packer(out_), format_settings(format_settings_) {}
void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num)
{
@ -226,10 +226,9 @@ void registerOutputFormatMsgPack(FormatFactory & factory)
factory.registerOutputFormat("MsgPack", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("MsgPack");
}

View File

@ -17,7 +17,7 @@ namespace DB
class MsgPackRowOutputFormat final : public IRowOutputFormat
{
public:
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "MsgPackRowOutputFormat"; }

View File

@ -106,7 +106,6 @@ void registerOutputFormatMySQLWire(FormatFactory & factory)
"MySQLWire",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
}

View File

@ -115,7 +115,6 @@ void registerOutputFormatNative(FormatFactory & factory)
factory.registerOutputFormat("Native", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NativeOutputFormat>(buf, sample);

View File

@ -13,7 +13,6 @@ void registerOutputFormatNull(FormatFactory & factory)
factory.registerOutputFormat("Null", [](
WriteBuffer &,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NullOutputFormat>(sample);

View File

@ -100,7 +100,7 @@ void ODBCDriver2BlockOutputFormat::writePrefix()
void registerOutputFormatODBCDriver2(FormatFactory & factory)
{
factory.registerOutputFormat(
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams &, const FormatSettings & format_settings)
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const FormatSettings & format_settings)
{
return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
});

View File

@ -515,6 +515,11 @@ void ORCBlockOutputFormat::finalizeImpl()
writer->close();
}
void ORCBlockOutputFormat::resetFormatterImpl()
{
writer.reset();
}
void ORCBlockOutputFormat::prepareWriter()
{
const Block & header = getPort(PortKind::Main).getHeader();
@ -531,7 +536,6 @@ void registerOutputFormatORC(FormatFactory & factory)
factory.registerOutputFormat("ORC", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);

View File

@ -44,6 +44,7 @@ public:
private:
void consume(Chunk chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
std::unique_ptr<orc::Type> getORCType(const DataTypePtr & type);

View File

@ -20,6 +20,11 @@
namespace DB
{
namespace ErrorCodes
{
const extern int NOT_IMPLEMENTED;
}
/**
* ORDER-PRESERVING parallel formatting of data formats.
* The idea is similar to ParallelParsingInputFormat.
@ -167,6 +172,12 @@ private:
void finalizeImpl() override;
void resetFormatterImpl() override
{
/// Resetting parallel formatting is not obvious and it's not used anywhere
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method resetFormatterImpl is not implemented for parallel formatting");
}
InternalFormatterCreator internal_formatter_creator;
/// Status to synchronize multiple threads.

View File

@ -74,13 +74,17 @@ void ParquetBlockOutputFormat::finalizeImpl()
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
void ParquetBlockOutputFormat::resetFormatterImpl()
{
file_writer.reset();
}
void registerOutputFormatParquet(FormatFactory & factory)
{
factory.registerOutputFormat(
"Parquet",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);

View File

@ -36,6 +36,7 @@ public:
private:
void consume(Chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
const FormatSettings format_settings;

View File

@ -66,7 +66,6 @@ void registerOutputFormatPostgreSQLWire(FormatFactory & factory)
"PostgreSQLWire",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<PostgreSQLOutputFormat>(buf, sample, settings); });
}
}

View File

@ -54,6 +54,11 @@ protected:
const IColumn & column, const ISerialization & serialization, size_t row_num,
size_t value_width, size_t pad_to_width, bool align_right);
void resetFormatterImpl() override
{
total_rows = 0;
}
private:
bool mono_block;
/// For mono_block == true only
@ -68,7 +73,6 @@ void registerPrettyFormatWithNoEscapesAndMonoBlock(FormatFactory & factory, cons
fact.registerOutputFormat(name, [no_escapes, mono_block](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
if (no_escapes)

View File

@ -82,9 +82,8 @@ static Float64 tryParseFloat(const String & s)
PrometheusTextOutputFormat::PrometheusTextOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_)
: IRowOutputFormat(header_, out_)
, string_serialization(DataTypeString().getDefaultSerialization())
, format_settings(format_settings_)
{
@ -339,10 +338,9 @@ void registerOutputFormatPrometheus(FormatFactory & factory)
factory.registerOutputFormat(FORMAT_NAME, [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<PrometheusTextOutputFormat>(buf, sample, params, settings);
return std::make_shared<PrometheusTextOutputFormat>(buf, sample, settings);
});
}

View File

@ -20,7 +20,6 @@ public:
PrometheusTextOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_);
String getName() const override { return "PrometheusTextOutputFormat"; }

View File

@ -30,6 +30,12 @@ ProtobufListInputFormat::ProtobufListInputFormat(
{
}
void ProtobufListInputFormat::setReadBuffer(ReadBuffer & in_)
{
reader->setReadBuffer(in_);
IRowInputFormat::setReadBuffer(in_);
}
bool ProtobufListInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension)
{
if (reader->eof())

View File

@ -33,6 +33,8 @@ public:
String getName() const override { return "ProtobufListInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override;

View File

@ -13,10 +13,9 @@ namespace DB
ProtobufListOutputFormat::ProtobufListOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_)
: IRowOutputFormat(header_, out_, params_)
: IRowOutputFormat(header_, out_)
, writer(std::make_unique<ProtobufWriter>(out))
, serializer(ProtobufSerializer::create(
header_.getNames(),
@ -42,18 +41,21 @@ void ProtobufListOutputFormat::finalizeImpl()
serializer->finalizeWrite();
}
void ProtobufListOutputFormat::resetFormatterImpl()
{
serializer->reset();
}
void registerOutputFormatProtobufList(FormatFactory & factory)
{
factory.registerOutputFormat(
"ProtobufList",
[](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufListOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings, "Protobuf", true),
buf, header, FormatSchemaInfo(settings, "Protobuf", true),
settings.protobuf.output_nullables_with_google_wrappers);
});
}

View File

@ -26,7 +26,6 @@ public:
ProtobufListOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_);
@ -39,6 +38,7 @@ private:
void writeField(const IColumn &, const ISerialization &, size_t) override {}
void finalizeImpl() override;
void resetFormatterImpl() override;
std::unique_ptr<ProtobufWriter> writer;
std::unique_ptr<ProtobufSerializer> serializer;

View File

@ -44,6 +44,12 @@ bool ProtobufRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
return true;
}
void ProtobufRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
reader->setReadBuffer(in_);
IRowInputFormat::setReadBuffer(in_);
}
bool ProtobufRowInputFormat::allowSyncAfterError() const
{
return true;

View File

@ -38,6 +38,8 @@ public:
String getName() const override { return "ProtobufRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override;
bool allowSyncAfterError() const override;

View File

@ -20,11 +20,10 @@ namespace ErrorCodes
ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
const FormatSettings & settings_,
bool with_length_delimiter_)
: IRowOutputFormat(header_, out_, params_)
: IRowOutputFormat(header_, out_)
, writer(std::make_unique<ProtobufWriter>(out))
, serializer(ProtobufSerializer::create(
header_.getNames(),
@ -59,14 +58,11 @@ void registerOutputFormatProtobuf(FormatFactory & factory)
with_length_delimiter ? "Protobuf" : "ProtobufSingle",
[with_length_delimiter](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings, "Protobuf", true),
settings,
with_length_delimiter);
buf, header, FormatSchemaInfo(settings, "Protobuf", true),
settings, with_length_delimiter);
});
}
}

View File

@ -30,7 +30,6 @@ public:
ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
const FormatSettings & settings_,
bool with_length_delimiter_);

View File

@ -8,9 +8,8 @@ namespace DB
RawBLOBRowOutputFormat::RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_)
: IRowOutputFormat(header_, out_, params_)
const Block & header_)
: IRowOutputFormat(header_, out_)
{
}
@ -30,10 +29,9 @@ void registerOutputFormatRawBLOB(FormatFactory & factory)
factory.registerOutputFormat("RawBLOB", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample, params);
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample);
});
}

View File

@ -29,8 +29,7 @@ class RawBLOBRowOutputFormat final : public IRowOutputFormat
public:
RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_);
const Block & header_);
String getName() const override { return "RawBLOBRowOutputFormat"; }

View File

@ -5,8 +5,8 @@
namespace DB
{
SQLInsertRowOutputFormat::SQLInsertRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), column_names(header_.getNames()), format_settings(format_settings_)
SQLInsertRowOutputFormat::SQLInsertRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), column_names(header_.getNames()), format_settings(format_settings_)
{
}
@ -88,16 +88,19 @@ void SQLInsertRowOutputFormat::writeSuffix()
writeChar('\n', out);
}
void SQLInsertRowOutputFormat::resetFormatterImpl()
{
rows_in_line = 0;
}
void registerOutputFormatSQLInsert(FormatFactory & factory)
{
factory.registerOutputFormat("SQLInsert", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<SQLInsertRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<SQLInsertRowOutputFormat>(buf, sample, settings);
});
}

View File

@ -16,7 +16,6 @@ public:
SQLInsertRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_);
String getName() const override { return "SQLInsertRowOutputFormat"; }
@ -26,11 +25,12 @@ public:
protected:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
virtual void writeFieldDelimiter() override;
virtual void writeRowStartDelimiter() override;
virtual void writeRowEndDelimiter() override;
virtual void writeRowBetweenDelimiter() override;
virtual void writeSuffix() override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
void writeSuffix() override;
void resetFormatterImpl() override;
void printLineStart();
void printColumnNames();

View File

@ -7,8 +7,8 @@
namespace DB
{
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, false, params_, format_settings_), fields(header.getNamesAndTypes())
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, false, format_settings_), fields(header.getNamesAndTypes())
{
for (auto & field : fields)
{
@ -45,10 +45,9 @@ void registerOutputFormatTSKV(FormatFactory & factory)
factory.registerOutputFormat("TSKV", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TSKVRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<TSKVRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("TSKV");
}

View File

@ -14,7 +14,7 @@ namespace DB
class TSKVRowOutputFormat final : public TabSeparatedRowOutputFormat
{
public:
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings);
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings);
String getName() const override { return "TSKVRowOutputFormat"; }

View File

@ -12,9 +12,8 @@ TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat(
bool with_names_,
bool with_types_,
bool is_raw_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), with_types(with_types_), is_raw(is_raw_), format_settings(format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), with_types(with_types_), is_raw(is_raw_), format_settings(format_settings_)
{
}
@ -108,10 +107,9 @@ void registerOutputFormatTabSeparated(FormatFactory & factory)
factory.registerOutputFormat(format_name, [is_raw, with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, with_names, with_types, is_raw, params, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, with_names, with_types, is_raw, settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);

View File

@ -25,7 +25,6 @@ public:
bool with_names_,
bool with_types_,
bool is_raw_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_);
String getName() const override { return "TabSeparatedRowOutputFormat"; }

View File

@ -133,9 +133,6 @@ void TemplateBlockOutputFormat::writePrefix()
void TemplateBlockOutputFormat::finalizeImpl()
{
if (finalized)
return;
size_t parts = format.format_idx_to_column_idx.size();
auto outside_statistics = getOutsideStatistics();
if (outside_statistics)
@ -184,17 +181,19 @@ void TemplateBlockOutputFormat::finalizeImpl()
}
writeString(format.delimiters[i + 1], out);
}
finalized = true;
}
void TemplateBlockOutputFormat::resetFormatterImpl()
{
row_count = 0;
statistics = Statistics();
}
void registerOutputFormatTemplate(FormatFactory & factory)
{
factory.registerOutputFormat("Template", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings)
{
ParsedTemplateFormatString resultset_format;

View File

@ -44,6 +44,7 @@ private:
void consumeTotals(Chunk chunk) override { statistics.totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { statistics.extremes = std::move(chunk); }
void finalizeImpl() override;
void resetFormatterImpl() override;
void writeRow(const Chunk & chunk, size_t row_num);
template <typename U, typename V> void writeValue(U value, EscapingRule escaping_rule);

View File

@ -267,6 +267,7 @@ void TemplateRowInputFormat::resetParser()
void TemplateRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
format_reader->setReadBuffer(*buf);
IInputFormat::setReadBuffer(*buf);
}

View File

@ -10,8 +10,8 @@ namespace DB
{
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_)
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_)
{
}
@ -46,10 +46,9 @@ void registerOutputFormatValues(FormatFactory & factory)
factory.registerOutputFormat("Values", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ValuesRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<ValuesRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Values");

View File

@ -15,7 +15,7 @@ class WriteBuffer;
class ValuesRowOutputFormat final : public IRowOutputFormat
{
public:
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "ValuesRowOutputFormat"; }

View File

@ -11,8 +11,8 @@ namespace DB
{
VerticalRowOutputFormat::VerticalRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_)
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
@ -160,10 +160,9 @@ void registerOutputFormatVertical(FormatFactory & factory)
factory.registerOutputFormat("Vertical", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<VerticalRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<VerticalRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Vertical");

View File

@ -18,7 +18,7 @@ class Context;
class VerticalRowOutputFormat final : public IRowOutputFormat
{
public:
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "VerticalRowOutputFormat"; }
@ -45,6 +45,11 @@ private:
/// For totals and extremes.
void writeSpecialRow(const Columns & columns, size_t row_num, const char * title);
void resetFormatterImpl() override
{
row_number = 0;
}
const FormatSettings format_settings;
size_t field_number = 0;
size_t row_number = 0;

View File

@ -7,8 +7,8 @@
namespace DB
{
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header_, out_, params_), fields(header_.getNamesAndTypes()), format_settings(format_settings_)
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header_, out_), fields(header_.getNamesAndTypes()), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
field_tag_names.resize(sample.columns());
@ -207,6 +207,13 @@ void XMLRowOutputFormat::finalizeImpl()
ostr->next();
}
void XMLRowOutputFormat::resetFormatterImpl()
{
RowOutputFormatWithUTF8ValidationAdaptor::resetFormatterImpl();
row_count = 0;
statistics = Statistics();
}
void XMLRowOutputFormat::writeRowsBeforeLimitAtLeast()
{
if (statistics.applied_limit)
@ -238,10 +245,9 @@ void registerOutputFormatXML(FormatFactory & factory)
factory.registerOutputFormat("XML", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<XMLRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<XMLRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("XML");

View File

@ -16,12 +16,10 @@ namespace DB
class XMLRowOutputFormat final : public RowOutputFormatWithUTF8ValidationAdaptor
{
public:
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "XMLRowOutputFormat"; }
WriteBuffer & getWriteBuffer() const override { return *ostr; }
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeRowStartDelimiter() override;
@ -30,6 +28,7 @@ private:
void writePrefix() override;
void writeSuffix() override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void writeMinExtreme(const Columns & columns, size_t row_num) override;
void writeMaxExtreme(const Columns & columns, size_t row_num) override;

View File

@ -40,6 +40,18 @@ public:
this->out.next();
}
void finalizeBuffers() override
{
if (validating_ostr)
validating_ostr->finalize();
}
void resetFormatterImpl() override
{
validating_ostr = std::make_unique<WriteBufferValidUTF8>(this->out);
ostr = validating_ostr.get();
}
protected:
/// Point to validating_ostr or out from IOutputFormat, should be used in derived classes instead of out.
WriteBuffer * ostr;
@ -50,7 +62,7 @@ private:
};
using OutputFormatWithUTF8ValidationAdaptor = OutputFormatWithUTF8ValidationAdaptorBase<IOutputFormat>;
using RowOutputFormatWithUTF8ValidationAdaptor = OutputFormatWithUTF8ValidationAdaptorBase<IRowOutputFormat, const IRowOutputFormat::Params &>;
using RowOutputFormatWithUTF8ValidationAdaptor = OutputFormatWithUTF8ValidationAdaptorBase<IRowOutputFormat>;
}

View File

@ -102,8 +102,7 @@ try
auto pipeline = QueryPipeline(std::move(input_format));
auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
RowOutputFormatParams out_params;
OutputFormatPtr output_format = std::make_shared<CSVRowOutputFormat>(out_buf, sample, true, true, out_params, format_settings);
OutputFormatPtr output_format = std::make_shared<CSVRowOutputFormat>(out_buf, sample, true, true, format_settings);
Block res;
while (reader->pull(res))
{

View File

@ -0,0 +1,37 @@
#include <Storages/IMessageProducer.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/logger_useful.h>
namespace DB
{
void ConcurrentMessageProducer::start(const ContextPtr & context)
{
initialize();
producing_task = context->getSchedulePool().createTask(getProducingTaskName(), [this]
{
producingTask();
/// Notify that producing task is finished.
task_finished.store(true);
task_finished.notify_all();
});
producing_task->activateAndSchedule();
}
void ConcurrentMessageProducer::finish()
{
LOG_DEBUG(&Poco::Logger::get("ConcurrentMessageProducer"), "finish");
/// We should execute finish logic only once.
if (finished.exchange(true))
return;
stopProducingTask();
/// Wait until producing task is finished.
task_finished.wait(false);
producing_task->deactivate();
finishImpl();
}
}

View File

@ -0,0 +1,67 @@
#pragma once
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <Interpreters/Context.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB
{
/// Interface for producing messages in streaming storages.
/// It's used in MessageQueueSink.
class IMessageProducer
{
public:
/// Do some preparations.
virtual void start(const ContextPtr & context) = 0;
/// Produce single message.
virtual void produce(const String & message, size_t rows_in_message, const Columns & columns, size_t last_row) = 0;
/// Finalize producer.
virtual void finish() = 0;
virtual ~IMessageProducer() = default;
};
/// Implements interface for concurrent message producing.
class ConcurrentMessageProducer : public IMessageProducer
{
public:
/// Create and schedule task in BackgroundSchedulePool that will produce messages.
void start(const ContextPtr & context) override;
/// Stop producing task, wait for ot to finish and finalize.
void finish() override;
/// In this method producer should not do any hard work and send message
/// to producing task, for example, by using ConcurrentBoundedQueue.
void produce(const String & message, size_t rows_in_message, const Columns & columns, size_t last_row) override = 0;
protected:
/// Do some initialization before scheduling producing task.
virtual void initialize() {}
/// Tell producer to finish all work and stop producing task
virtual void stopProducingTask() = 0;
/// Do some finalization after producing task is stopped.
virtual void finishImpl() {}
virtual String getProducingTaskName() const = 0;
/// Method that is called inside producing task, all producing wokr should be done here.
virtual void producingTask() = 0;
private:
/// Flag, indicated that finish() method was called.
/// It's used to prevent doing finish logic more than once.
std::atomic<bool> finished = false;
/// Flag, indicated that producing task was finished.
/// It's used to wait until producing task is finished.
std::atomic<bool> task_finished = false;
BackgroundSchedulePool::TaskHolder producing_task;
};
}

View File

@ -1,14 +0,0 @@
#pragma once
#include <memory>
namespace DB
{
class ReadBufferFromKafkaConsumer;
class WriteBufferToKafkaProducer;
using ConsumerBufferPtr = std::shared_ptr<ReadBufferFromKafkaConsumer>;
using ProducerBufferPtr = std::shared_ptr<WriteBufferToKafkaProducer>;
}

Some files were not shown because too many files have changed in this diff Show More