Merge pull request #42777 from Avogar/improve-streaming-engines

Refactor and Improve streaming engines Kafka/RabbitMQ/NATS and data formats
This commit is contained in:
Kruglov Pavel 2023-01-02 15:59:06 +01:00 committed by GitHub
commit 966f57ef68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
176 changed files with 2946 additions and 1846 deletions

View File

@ -34,7 +34,14 @@ SETTINGS
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
```
Required parameters:
@ -46,13 +53,20 @@ Required parameters:
Optional parameters:
- `kafka_row_delimiter` — Delimiter character, which ends the message.
- `kafka_row_delimiter` — Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `kafka_schema` — Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` — The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll (default: `max_block_size`).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data).
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer (default: `0`). When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block).
- `kafka_num_consumers` — The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed. Default: `1`.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block. Default: `0`.
- `kafka_client_id` — Client identifier. Empty by default.
- `kafka_poll_timeout_ms` — Timeout for single poll from Kafka. Default: [stream_poll_timeout_ms](../../../operations/settings/settings.md#stream_poll_timeout_ms).
- `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
- `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`.
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default, stream.
- `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`.
- `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`.
Examples:
@ -94,7 +108,7 @@ Do not use this method in new projects. If possible, switch old projects to the
``` sql
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
```
</details>
@ -193,6 +207,14 @@ Example:
- `_headers.name` — Array of message's headers keys.
- `_headers.value` — Array of message's headers values.
## Data formats support {#data-formats-support}
Kafka engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one Kafka message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one Kafka message can be controlled by setting `kafka_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
**See Also**
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)

View File

@ -37,8 +37,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[nats_max_block_size = N,]
[nats_flush_interval_ms = N,]
[nats_username = 'user',]
[nats_password = 'password']
[redis_password = 'clickhouse']
[nats_password = 'password',]
[nats_token = 'clickhouse',]
[nats_startup_connect_tries = '5']
[nats_max_rows_per_message = 1]
```
Required parameters:
@ -49,7 +51,7 @@ Required parameters:
Optional parameters:
- `nats_row_delimiter` Delimiter character, which ends the message.
- `nats_row_delimiter` Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `nats_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `nats_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `nats_queue_group` Name for queue group of NATS subscribers. Default is the table name.
@ -57,11 +59,13 @@ Optional parameters:
- `nats_reconnect_wait` Amount of time in milliseconds to sleep between each reconnect attempt. Default: `5000`.
- `nats_server_list` - Server list for connection. Can be specified to connect to NATS cluster.
- `nats_skip_broken_messages` - NATS message parser tolerance to schema-incompatible messages per block. Default: `0`. If `nats_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS.
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS.
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `nats_username` - NATS username.
- `nats_password` - NATS password.
- `nats_token` - NATS auth token.
- `nats_startup_connect_tries` - Number of connect tries at startup. Default: `5`.
- `nats_max_rows_per_message` — The maximum number of rows written in one NATS message for row-based formats. (default : `1`).
SSL connection:
@ -161,4 +165,12 @@ If you want to change the target table by using `ALTER`, we recommend disabling
- `_subject` - NATS message subject.
## Data formats support {#data-formats-support}
NATS engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one NATS message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one NATS message can be controlled by setting `nats_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/nats/) <!--hide-->

View File

@ -37,8 +37,16 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish']
[rabbitmq_flush_interval_ms = N,]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
[rabbitmq_queue_consume = false,]
[rabbitmq_address = '',]
[rabbitmq_vhost = '/',]
[rabbitmq_queue_consume = false,]
[rabbitmq_username = '',]
[rabbitmq_password = '',]
[rabbitmq_commit_on_select = false,]
[rabbitmq_max_rows_per_message = 1]
```
Required parameters:
@ -51,17 +59,25 @@ Optional parameters:
- `rabbitmq_exchange_type` The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`.
- `rabbitmq_routing_key_list` A comma-separated list of routing keys.
- `rabbitmq_row_delimiter` Delimiter character, which ends the message.
- `rabbitmq_row_delimiter` Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
- `rabbitmq_num_consumers` The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. Default: `1`
- `rabbitmq_num_queues` Total number of queues. Increasing this number can significantly improve performance. Default: `1`.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. Default: `0`. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `rabbitmq_max_block_size` - Number of row collected before flushing data from RabbitMQ. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `rabbitmq_flush_interval_ms` - Timeout for flushing data from RabbitMQ. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `rabbitmq_queue_settings_list` - allows to set RabbitMQ settings when creating a queue. Available settings: `x-max-length`, `x-max-length-bytes`, `x-message-ttl`, `x-expires`, `x-priority`, `x-max-priority`, `x-overflow`, `x-dead-letter-exchange`, `x-queue-type`. The `durable` setting is enabled automatically for the queue.
- `rabbitmq_address` - Address for connection. Use ether this setting or `rabbitmq_host_port`.
- `rabbitmq_vhost` - RabbitMQ vhost. Default: `'\'`.
- `rabbitmq_queue_consume` - Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings. Default: `false`.
- `rabbitmq_username` - RabbitMQ username.
- `rabbitmq_password` - RabbitMQ password.
- `rabbitmq_commit_on_select` - Commit messages when select query is made. Default: `false`.
- `rabbitmq_max_rows_per_message` — The maximum number of rows written in one RabbitMQ message for row-based formats. Default : `1`.
SSL connection:
@ -173,4 +189,13 @@ Example:
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.
## Data formats support {#data-formats-support}
RabbitMQ engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one RabbitMQ message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one RabbitMQ message can be controlled by setting `rabbitmq_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/rabbitmq/) <!--hide-->

View File

@ -1011,6 +1011,12 @@ The default value is 7500.
The smaller the value, the more often data is flushed into the table. Setting the value too low leads to poor performance.
## stream_poll_timeout_ms {#stream_poll_timeout_ms}
Timeout for polling data from/to streaming storages.
Default value: 500.
## load_balancing {#settings-load_balancing}
Specifies the algorithm of replicas selection that is used for distributed query processing.

View File

@ -1497,7 +1497,7 @@ formatRow(format, x, y, ...)
**Returned value**
- A formatted string (for text formats it's usually terminated with the new line character).
- A formatted string. (for text formats it's usually terminated with the new line character).
**Example**
@ -1521,9 +1521,39 @@ Result:
└──────────────────────────────────┘
```
**Note**: If format contains suffix/prefix, it will be written in each row.
**Example**
Query:
``` sql
SELECT formatRow('CustomSeparated', number, 'good')
FROM numbers(3)
SETTINGS format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>'
```
Result:
``` text
┌─formatRow('CustomSeparated', number, 'good')─┐
<prefix>
0 good
<suffix>
<prefix>
1 good
<suffix>
<prefix>
2 good
<suffix>
└──────────────────────────────────────────────┘
```
Note: Only row-based formats are supported in this function.
## formatRowNoNewline
Converts arbitrary expressions into a string via given format. The function trims the last `\n` if any.
Converts arbitrary expressions into a string via given format. Differs from formatRow in that this function trims the last `\n` if any.
**Syntax**

View File

@ -1316,7 +1316,7 @@ formatRow(format, x, y, ...)
**Возвращаемое значение**
- Отформатированная строка (в текстовых форматах обычно с завершающим переводом строки).
- Отформатированная строка. (в текстовых форматах обычно с завершающим переводом строки).
**Пример**
@ -1340,9 +1340,39 @@ FROM numbers(3);
└──────────────────────────────────┘
```
**Примечание**: если формат содержит префикс/суффикс, то он будет записан в каждой строке.
**Пример**
Запрос:
``` sql
SELECT formatRow('CustomSeparated', number, 'good')
FROM numbers(3)
SETTINGS format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>'
```
Результат:
``` text
┌─formatRow('CustomSeparated', number, 'good')─┐
<prefix>
0 good
<suffix>
<prefix>
1 good
<suffix>
<prefix>
2 good
<suffix>
└──────────────────────────────────────────────┘
```
**Примечание**: данная функция поддерживает только строковые форматы вывода.
## formatRowNoNewline {#formatrownonewline}
Преобразует произвольные выражения в строку заданного формата. При этом удаляет лишние переводы строк `\n`, если они появились.
Преобразует произвольные выражения в строку заданного формата. Отличается от функции formatRow тем, что удаляет лишний перевод строки `\n` а конце, если он есть.
**Синтаксис**

View File

@ -1116,6 +1116,8 @@ void ClientBase::onProfileEvents(Block & block)
/// Flush all buffers.
void ClientBase::resetOutput()
{
if (output_format)
output_format->finalize();
output_format.reset();
logs_out_stream.reset();

View File

@ -328,7 +328,6 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_creator;
@ -342,9 +341,9 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
if (settings.output_format_parallel_formatting && getCreators(name).supports_parallel_formatting
&& !settings.output_format_json_array_of_rows)
{
auto formatter_creator = [output_getter, sample, callback, format_settings] (WriteBuffer & output) -> OutputFormatPtr
auto formatter_creator = [output_getter, sample, format_settings] (WriteBuffer & output) -> OutputFormatPtr
{
return output_getter(output, sample, {callback}, format_settings);
return output_getter(output, sample, format_settings);
};
ParallelFormattingOutputFormat::Params builder{buf, sample, formatter_creator, settings.max_threads};
@ -357,7 +356,7 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
return format;
}
return getOutputFormat(name, buf, sample, context, callback, _format_settings);
return getOutputFormat(name, buf, sample, context, _format_settings);
}
@ -366,7 +365,6 @@ OutputFormatPtr FormatFactory::getOutputFormat(
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_creator;
@ -376,15 +374,12 @@ OutputFormatPtr FormatFactory::getOutputFormat(
if (context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Format, name);
RowOutputFormatParams params;
params.callback = std::move(callback);
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
auto format = output_getter(buf, sample, params, format_settings);
auto format = output_getter(buf, sample, format_settings);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
@ -411,9 +406,8 @@ String FormatFactory::getContentType(
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
Block empty_block;
RowOutputFormatParams empty_params;
WriteBufferFromOwnString empty_buffer;
auto format = output_getter(empty_buffer, empty_block, empty_params, format_settings);
auto format = output_getter(empty_buffer, empty_block, format_settings);
return format->getContentType();
}

View File

@ -30,9 +30,9 @@ using ProcessorPtr = std::shared_ptr<IProcessor>;
class IInputFormat;
class IOutputFormat;
class IRowOutputFormat;
struct RowInputFormatParams;
struct RowOutputFormatParams;
class ISchemaReader;
class IExternalSchemaReader;
@ -41,6 +41,7 @@ using ExternalSchemaReaderPtr = std::shared_ptr<IExternalSchemaReader>;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
using RowOutputFormatPtr = std::shared_ptr<IRowOutputFormat>;
template <typename Allocator>
struct Memory;
@ -56,10 +57,6 @@ FormatSettings getFormatSettings(ContextPtr context, const T & settings);
class FormatFactory final : private boost::noncopyable
{
public:
/// This callback allows to perform some additional actions after reading a single row.
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;
/** Fast reading data from buffer and save result to memory.
* Reads at least `min_bytes` and some more until the end of the chunk, depends on the format.
* If `max_rows` is non-zero the function also stops after reading the `max_rows` number of rows
@ -72,12 +69,6 @@ public:
size_t min_bytes,
size_t max_rows)>;
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void(
const Columns & columns,
size_t row)>;
private:
using InputCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
@ -88,7 +79,6 @@ private:
using OutputCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)>;
/// Some input formats can have non trivial readPrefix() and readSuffix(),
@ -153,7 +143,6 @@ public:
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
OutputFormatPtr getOutputFormat(
@ -161,7 +150,6 @@ public:
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & _format_settings = std::nullopt) const;
String getContentType(

View File

@ -37,7 +37,7 @@ namespace
ProtobufReader::ProtobufReader(ReadBuffer & in_)
: in(in_)
: in(&in_)
{
}
@ -153,7 +153,7 @@ bool ProtobufReader::readFieldNumber(int & field_number_)
{
if (current_message_end == END_OF_FILE)
{
if (unlikely(in.eof()))
if (unlikely(in->eof()))
{
current_message_end = cursor;
return false;
@ -282,26 +282,26 @@ void ProtobufReader::readStringAndAppend(PaddedPODArray<UInt8> & str)
void ProtobufReader::readBinary(void* data, size_t size)
{
in.readStrict(reinterpret_cast<char*>(data), size);
in->readStrict(reinterpret_cast<char*>(data), size);
cursor += size;
}
void ProtobufReader::ignore(UInt64 num_bytes)
{
in.ignore(num_bytes);
in->ignore(num_bytes);
cursor += num_bytes;
}
void ProtobufReader::ignoreAll()
{
cursor += in.tryIgnore(std::numeric_limits<size_t>::max());
cursor += in->tryIgnore(std::numeric_limits<size_t>::max());
}
void ProtobufReader::moveCursorBackward(UInt64 num_bytes)
{
if (in.offset() < num_bytes)
if (in->offset() < num_bytes)
throwUnknownFormat();
in.position() -= num_bytes;
in->position() -= num_bytes;
cursor -= num_bytes;
}
@ -313,7 +313,7 @@ UInt64 ProtobufReader::continueReadingVarint(UInt64 first_byte)
# define PROTOBUF_READER_READ_VARINT_BYTE(byteNo) \
do \
{ \
in.readStrict(c); \
in->readStrict(c); \
++cursor; \
if constexpr ((byteNo) < 10) \
{ \
@ -352,7 +352,7 @@ void ProtobufReader::ignoreVarint()
# define PROTOBUF_READER_IGNORE_VARINT_BYTE(byteNo) \
do \
{ \
in.readStrict(c); \
in->readStrict(c); \
++cursor; \
if constexpr ((byteNo) < 10) \
{ \

View File

@ -32,7 +32,9 @@ public:
void readString(String & str);
void readStringAndAppend(PaddedPODArray<UInt8> & str);
bool eof() const { return in.eof(); }
bool eof() const { return in->eof(); }
void setReadBuffer(ReadBuffer & in_) { in = &in_; }
private:
void readBinary(void * data, size_t size);
@ -43,7 +45,7 @@ private:
UInt64 ALWAYS_INLINE readVarint()
{
char c;
in.readStrict(c);
in->readStrict(c);
UInt64 first_byte = static_cast<UInt8>(c);
++cursor;
if (likely(!(c & 0x80)))
@ -56,7 +58,7 @@ private:
void ignoreGroup();
[[noreturn]] void throwUnknownFormat() const;
ReadBuffer & in;
ReadBuffer * in;
Int64 cursor = 0;
bool root_message_has_length_delimiter = false;
size_t current_message_level = 0;

View File

@ -2511,6 +2511,11 @@ namespace
writer->endMessage(/*with_length_delimiter = */ true);
}
void reset() override
{
first_call_of_write_row = true;
}
void readRow(size_t row_num) override
{
if (first_call_of_read_row)

View File

@ -27,6 +27,7 @@ public:
virtual void setColumns(const ColumnPtr * columns, size_t num_columns) = 0;
virtual void writeRow(size_t row_num) = 0;
virtual void finalizeWrite() {}
virtual void reset() {}
virtual void setColumns(const MutableColumnPtr * columns, size_t num_columns) = 0;
virtual void readRow(size_t row_num) = 0;

View File

@ -29,7 +29,6 @@ namespace
* several columns to generate a string per row, such as CSV, TSV, JSONEachRow, etc.
* formatRowNoNewline(...) trims the newline character of each row.
*/
template <bool no_newline>
class FunctionFormatRow : public IFunction
{
@ -60,8 +59,20 @@ public:
for (auto i = 1u; i < arguments.size(); ++i)
arg_columns.insert(arguments[i]);
materializeBlockInplace(arg_columns);
auto out = FormatFactory::instance().getOutputFormat(format_name, buffer, arg_columns, context, [&](const Columns &, size_t row)
auto format_settings = getFormatSettings(context);
auto out = FormatFactory::instance().getOutputFormat(format_name, buffer, arg_columns, context, format_settings);
/// This function make sense only for row output formats.
auto * row_output_format = dynamic_cast<IRowOutputFormat *>(out.get());
if (!row_output_format)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot turn rows into a {} format strings. {} function supports only row output formats", format_name, getName());
auto columns = arg_columns.getColumns();
for (size_t i = 0; i != input_rows_count; ++i)
{
row_output_format->writePrefixIfNeeded();
row_output_format->writeRow(columns, i);
row_output_format->finalize();
if constexpr (no_newline)
{
// replace '\n' with '\0'
@ -70,16 +81,11 @@ public:
}
else
writeChar('\0', buffer);
offsets[row] = buffer.count();
});
/// This function make sense only for row output formats.
if (!dynamic_cast<IRowOutputFormat *>(out.get()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot turn rows into a {} format strings. {} function supports only row output formats", format_name, getName());
offsets[i] = buffer.count();
row_output_format->resetFormatter();
}
/// Don't write prefix if any.
out->doNotWritePrefix();
out->write(arg_columns);
return col_str;
}

View File

@ -10,12 +10,12 @@ namespace ErrorCodes
}
PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= 0*/)
: BufferWithOwnMemory(start_size_), sub_buf(sub_buf_)
: BufferWithOwnMemory(start_size_), sub_buf(&sub_buf_)
{
padded &= sub_buf.isPadded();
padded &= sub_buf->isPadded();
/// Read from sub-buffer
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
checkStateCorrect();
}
@ -23,17 +23,26 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_
void PeekableReadBuffer::reset()
{
checkStateCorrect();
}
void PeekableReadBuffer::setSubBuffer(ReadBuffer & sub_buf_)
{
sub_buf = &sub_buf_;
resetImpl();
}
void PeekableReadBuffer::resetImpl()
{
peeked_size = 0;
checkpoint = std::nullopt;
checkpoint_in_own_memory = false;
use_stack_memory = true;
if (!currentlyReadFromOwnMemory())
sub_buf.position() = pos;
sub_buf->position() = pos;
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
checkStateCorrect();
}
@ -43,20 +52,20 @@ bool PeekableReadBuffer::peekNext()
checkStateCorrect();
Position copy_from = pos;
size_t bytes_to_copy = sub_buf.available();
size_t bytes_to_copy = sub_buf->available();
if (useSubbufferOnly())
{
/// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer)
if (checkpoint)
copy_from = *checkpoint;
bytes_to_copy = sub_buf.buffer().end() - copy_from;
bytes_to_copy = sub_buf->buffer().end() - copy_from;
if (!bytes_to_copy)
{
sub_buf.position() = copy_from;
sub_buf->position() = copy_from;
/// Both checkpoint and pos are at the end of sub-buffer. Just load next part of data.
bool res = sub_buf.next();
BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
bool res = sub_buf->next();
BufferBase::set(sub_buf->buffer().begin(), sub_buf->buffer().size(), sub_buf->offset());
if (checkpoint)
checkpoint.emplace(pos);
@ -70,13 +79,13 @@ bool PeekableReadBuffer::peekNext()
if (useSubbufferOnly())
{
sub_buf.position() = copy_from;
sub_buf->position() = copy_from;
}
char * memory_data = getMemoryData();
/// Save unread data from sub-buffer to own memory
memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_copy);
memcpy(memory_data + peeked_size, sub_buf->position(), bytes_to_copy);
/// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary
/// Otherwise, checkpoint now at the beginning of own memory
@ -106,10 +115,10 @@ bool PeekableReadBuffer::peekNext()
}
peeked_size += bytes_to_copy;
sub_buf.position() += bytes_to_copy;
sub_buf->position() += bytes_to_copy;
checkStateCorrect();
return sub_buf.next();
return sub_buf->next();
}
void PeekableReadBuffer::rollbackToCheckpoint(bool drop)
@ -152,7 +161,7 @@ bool PeekableReadBuffer::nextImpl()
if (checkpoint)
{
if (currentlyReadFromOwnMemory())
res = sub_buf.hasPendingData() || sub_buf.next();
res = sub_buf->hasPendingData() || sub_buf->next();
else
res = peekNext();
}
@ -161,21 +170,21 @@ bool PeekableReadBuffer::nextImpl()
if (useSubbufferOnly())
{
/// Load next data to sub_buf
sub_buf.position() = position();
res = sub_buf.next();
sub_buf->position() = position();
res = sub_buf->next();
}
else
{
/// All copied data have been read from own memory, continue reading from sub_buf
peeked_size = 0;
res = sub_buf.hasPendingData() || sub_buf.next();
res = sub_buf->hasPendingData() || sub_buf->next();
}
}
/// Switch to reading from sub_buf (or just update it if already switched)
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
nextimpl_working_buffer_offset = sub_buf.offset();
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
nextimpl_working_buffer_offset = sub_buf->offset();
if (checkpoint_at_end)
{
@ -199,8 +208,8 @@ void PeekableReadBuffer::checkStateCorrect() const
throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR);
if (currentlyReadFromOwnMemory() && pos < *checkpoint)
throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR);
if (!currentlyReadFromOwnMemory() && pos < sub_buf.position())
throw DB::Exception("Current position in subbuffer less than sub_buf.position()", ErrorCodes::LOGICAL_ERROR);
if (!currentlyReadFromOwnMemory() && pos < sub_buf->position())
throw DB::Exception("Current position in subbuffer less than sub_buf->position()", ErrorCodes::LOGICAL_ERROR);
}
else
{
@ -294,11 +303,11 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory())
return; /// it's already continuous
size_t bytes_to_append = pos - sub_buf.position();
size_t bytes_to_append = pos - sub_buf->position();
resizeOwnMemoryIfNecessary(bytes_to_append);
char * memory_data = getMemoryData();
memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_append);
sub_buf.position() = pos;
memcpy(memory_data + peeked_size, sub_buf->position(), bytes_to_append);
sub_buf->position() = pos;
peeked_size += bytes_to_append;
BufferBase::set(memory_data, peeked_size, peeked_size);
}
@ -306,7 +315,7 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
PeekableReadBuffer::~PeekableReadBuffer()
{
if (!currentlyReadFromOwnMemory())
sub_buf.position() = pos;
sub_buf->position() = pos;
}
bool PeekableReadBuffer::hasUnreadData() const

View File

@ -24,7 +24,7 @@ public:
~PeekableReadBuffer() override;
void prefetch() override { sub_buf.prefetch(); }
void prefetch() override { sub_buf->prefetch(); }
/// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint()
@ -71,13 +71,17 @@ public:
// without recreating the buffer.
void reset();
void setSubBuffer(ReadBuffer & sub_buf_);
private:
bool nextImpl() override;
void resetImpl();
bool peekNext();
inline bool useSubbufferOnly() const { return !peeked_size; }
inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf.buffer().begin(); }
inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf->buffer().begin(); }
inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; }
void checkStateCorrect() const;
@ -90,7 +94,7 @@ private:
const char * getMemoryData() const { return use_stack_memory ? stack_memory : memory.data(); }
ReadBuffer & sub_buf;
ReadBuffer * sub_buf;
size_t peeked_size = 0;
std::optional<Position> checkpoint = std::nullopt;
bool checkpoint_in_own_memory = false;

View File

@ -26,6 +26,7 @@ template <typename VectorType>
class WriteBufferFromVector : public WriteBuffer
{
public:
using ValueType = typename VectorType::value_type;
explicit WriteBufferFromVector(VectorType & vector_)
: WriteBuffer(reinterpret_cast<Position>(vector_.data()), vector_.size()), vector(vector_)
{
@ -50,9 +51,11 @@ public:
bool isFinished() const { return finalized; }
void restart()
void restart(std::optional<size_t> max_capacity = std::nullopt)
{
if (vector.empty())
if (max_capacity && vector.capacity() > max_capacity)
VectorType(initial_size, ValueType()).swap(vector);
else if (vector.empty())
vector.resize(initial_size);
set(reinterpret_cast<Position>(vector.data()), vector.size());
finalized = false;
@ -68,8 +71,8 @@ private:
{
vector.resize(
((position() - reinterpret_cast<Position>(vector.data())) /// NOLINT
+ sizeof(typename VectorType::value_type) - 1) /// Align up.
/ sizeof(typename VectorType::value_type));
+ sizeof(ValueType) - 1) /// Align up.
/ sizeof(ValueType));
/// Prevent further writes.
set(nullptr, 0);

View File

@ -1216,7 +1216,6 @@ void executeQuery(
compressed_buffer ? *compressed_buffer : *out_buf,
materializeBlock(pipeline.getHeader()),
context,
{},
output_format_settings);
out->setAutoFlush();

View File

@ -34,8 +34,15 @@ MutableColumns StreamingFormatExecutor::getResultColumns()
size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
{
auto & initial_buf = format->getReadBuffer();
format->setReadBuffer(buffer);
return execute();
size_t rows = execute();
/// Format destructor can touch read buffer (for example when we use PeekableReadBuffer),
/// but we cannot control lifetime of provided read buffer. To avoid heap use after free
/// we can set initial read buffer back, because initial read buffer was created before
/// format, so it will be destructed after it.
format->setReadBuffer(initial_buf);
return rows;
}
size_t StreamingFormatExecutor::execute()

View File

@ -2,6 +2,7 @@
#include <Processors/Formats/IInputFormat.h>
#include <Processors/ISimpleTransform.h>
#include <IO/EmptyReadBuffer.h>
namespace DB
{

View File

@ -38,7 +38,7 @@ public:
virtual void resetParser();
virtual void setReadBuffer(ReadBuffer & in_);
const ReadBuffer & getReadBuffer() const { return *in; }
ReadBuffer & getReadBuffer() const { return *in; }
virtual const BlockMissingValues & getMissingValues() const
{

View File

@ -65,7 +65,7 @@ static Chunk prepareTotals(Chunk chunk)
void IOutputFormat::work()
{
writePrefixIfNot();
writePrefixIfNeeded();
if (finished && !finalized)
{
@ -73,6 +73,8 @@ void IOutputFormat::work()
setRowsBeforeLimit(rows_before_limit_counter->get());
finalize();
if (auto_flush)
flush();
return;
}
@ -84,7 +86,7 @@ void IOutputFormat::work()
consume(std::move(current_chunk));
break;
case Totals:
writeSuffixIfNot();
writeSuffixIfNeeded();
if (auto totals = prepareTotals(std::move(current_chunk)))
{
consumeTotals(std::move(totals));
@ -92,7 +94,7 @@ void IOutputFormat::work()
}
break;
case Extremes:
writeSuffixIfNot();
writeSuffixIfNeeded();
consumeExtremes(std::move(current_chunk));
break;
}
@ -110,7 +112,7 @@ void IOutputFormat::flush()
void IOutputFormat::write(const Block & block)
{
writePrefixIfNot();
writePrefixIfNeeded();
consume(Chunk(block.getColumns(), block.rows()));
if (auto_flush)
@ -121,9 +123,10 @@ void IOutputFormat::finalize()
{
if (finalized)
return;
writePrefixIfNot();
writeSuffixIfNot();
writePrefixIfNeeded();
writeSuffixIfNeeded();
finalizeImpl();
finalizeBuffers();
finalized = true;
}

View File

@ -61,13 +61,13 @@ public:
void setTotals(const Block & totals)
{
writeSuffixIfNot();
writeSuffixIfNeeded();
consumeTotals(Chunk(totals.getColumns(), totals.rows()));
are_totals_written = true;
}
void setExtremes(const Block & extremes)
{
writeSuffixIfNot();
writeSuffixIfNeeded();
consumeExtremes(Chunk(extremes.getColumns(), extremes.rows()));
}
@ -76,6 +76,14 @@ public:
void doNotWritePrefix() { need_write_prefix = false; }
void resetFormatter()
{
need_write_prefix = true;
need_write_suffix = true;
finalized = false;
resetFormatterImpl();
}
/// Reset the statistics watch to a specific point in time
/// If set to not running it will stop on the call (elapsed = now() - given start)
void setStartTime(UInt64 start, bool is_running)
@ -85,17 +93,7 @@ public:
statistics.watch.stop();
}
protected:
friend class ParallelFormattingOutputFormat;
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
void writePrefixIfNot()
void writePrefixIfNeeded()
{
if (need_write_prefix)
{
@ -104,7 +102,11 @@ protected:
}
}
void writeSuffixIfNot()
protected:
friend class ParallelFormattingOutputFormat;
void writeSuffixIfNeeded()
{
if (need_write_suffix)
{
@ -113,6 +115,15 @@ protected:
}
}
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void finalizeBuffers() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
virtual void resetFormatterImpl() {}
/// Methods-helpers for parallel formatting.
/// Set the number of rows that was already read in

View File

@ -10,12 +10,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
IRowOutputFormat::IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_)
IRowOutputFormat::IRowOutputFormat(const Block & header, WriteBuffer & out_)
: IOutputFormat(header, out_)
, num_columns(header.columns())
, types(header.getDataTypes())
, serializations(header.getSerializations())
, params(params_)
{
}
@ -26,14 +25,10 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
for (size_t row = 0; row < num_rows; ++row)
{
if (!first_row || getRowsReadBefore() != 0)
if (haveWrittenData())
writeRowBetweenDelimiter();
write(columns, row);
if (params.callback)
params.callback(columns, row);
first_row = false;
}
}

View File

@ -9,14 +9,6 @@
namespace DB
{
struct RowOutputFormatParams
{
using WriteCallback = std::function<void(const Columns & columns,size_t row)>;
// Callback used to indicate that another row is written.
WriteCallback callback;
};
class WriteBuffer;
/** Output format that writes data row by row.
@ -24,10 +16,17 @@ class WriteBuffer;
class IRowOutputFormat : public IOutputFormat
{
public:
using Params = RowOutputFormatParams;
/// Used to work with IRowOutputFormat explicitly.
void writeRow(const Columns & columns, size_t row_num)
{
first_row = false;
write(columns, row_num);
}
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
protected:
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_);
IRowOutputFormat(const Block & header, WriteBuffer & out_);
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
void consumeExtremes(Chunk chunk) override;
@ -51,7 +50,6 @@ protected:
virtual void writeFieldDelimiter() {} /// delimiter between values
virtual void writeRowStartDelimiter() {} /// delimiter before each row
virtual void writeRowEndDelimiter() {} /// delimiter after each row
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
virtual void writePrefix() override {} /// delimiter before resultset
virtual void writeSuffix() override {} /// delimiter after resultset
virtual void writeBeforeTotals() {}
@ -60,10 +58,11 @@ protected:
virtual void writeAfterExtremes() {}
virtual void finalizeImpl() override {} /// Write something after resultset, totals end extremes.
bool haveWrittenData() { return !first_row || getRowsReadBefore() != 0; }
size_t num_columns;
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
};

View File

@ -21,7 +21,6 @@ ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block &
: IOutputFormat(header_, out_)
, stream{stream_}
, format_settings{format_settings_}
, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
{
}
@ -65,8 +64,15 @@ void ArrowBlockOutputFormat::finalizeImpl()
"Error while closing a table: {}", status.ToString());
}
void ArrowBlockOutputFormat::resetFormatterImpl()
{
writer.reset();
arrow_ostream.reset();
}
void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema> & schema)
{
arrow_ostream = std::make_shared<ArrowBufferedOutputStream>(out);
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
// TODO: should we use arrow::ipc::IpcOptions::alignment?
@ -88,7 +94,6 @@ void registerOutputFormatArrow(FormatFactory & factory)
"Arrow",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
@ -99,7 +104,6 @@ void registerOutputFormatArrow(FormatFactory & factory)
"ArrowStream",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);

View File

@ -27,6 +27,7 @@ public:
private:
void consume(Chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void prepareWriter(const std::shared_ptr<arrow::Schema> & schema);

View File

@ -906,11 +906,15 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, params_)
, schema_registry(getConfluentSchemaRegistry(format_settings_))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(*in))
, decoder(avro::binaryDecoder())
, format_settings(format_settings_)
{
}
void AvroConfluentRowInputFormat::readPrefix()
{
input_stream = std::make_unique<InputStreamReadBufferAdapter>(*in);
decoder = avro::binaryDecoder();
decoder->init(*input_stream);
}

View File

@ -163,6 +163,7 @@ public:
private:
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;

View File

@ -442,8 +442,8 @@ static avro::Codec getCodec(const std::string & codec_name)
}
AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_)
WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName(), std::make_unique<AvroSerializerTraits>(settings))
{
@ -477,67 +477,24 @@ void AvroRowOutputFormat::write(const Columns & columns, size_t row_num)
file_writer_ptr->incr();
}
void AvroRowOutputFormat::writeSuffix()
void AvroRowOutputFormat::finalizeImpl()
{
file_writer_ptr->close();
}
void AvroRowOutputFormat::resetFormatterImpl()
{
file_writer_ptr.reset();
}
void AvroRowOutputFormat::consume(DB::Chunk chunk)
{
if (params.callback)
consumeImplWithCallback(std::move(chunk));
else
consumeImpl(std::move(chunk));
}
void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows; ++row)
{
write(columns, row);
}
}
void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows;)
{
size_t current_row = row;
/// used by WriteBufferToKafkaProducer to obtain auxiliary data
/// from the starting row of a file
writePrefixIfNot();
for (size_t row_in_file = 0;
row_in_file < settings.avro.output_rows_in_file && row < num_rows;
++row, ++row_in_file)
{
write(columns, row);
}
file_writer_ptr->flush();
writeSuffix();
need_write_prefix = true;
params.callback(columns, current_row);
}
}
void registerOutputFormatAvro(FormatFactory & factory)
{
factory.registerOutputFormat("Avro", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<AvroRowOutputFormat>(buf, sample, settings);
});
factory.markFormatHasNoAppendSupport("Avro");
}

View File

@ -46,27 +46,23 @@ private:
class AvroRowOutputFormat final : public IRowOutputFormat
{
public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_);
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
void consume(Chunk) override;
String getName() const override { return "AvroRowOutputFormat"; }
private:
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
virtual void writePrefix() override;
virtual void writeSuffix() override;
virtual void finalizeImpl() override;
virtual void resetFormatterImpl() override;
void createFileWriter();
FormatSettings settings;
AvroSerializer serializer;
std::unique_ptr<avro::DataFileWriterBase> file_writer_ptr;
void consumeImpl(Chunk);
void consumeImplWithCallback(Chunk);
};
}

View File

@ -43,8 +43,8 @@ static String toValidUTF8String(const String & name)
}
BSONEachRowRowOutputFormat::BSONEachRowRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_), settings(settings_)
WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_), settings(settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
fields.reserve(sample.columns());
@ -519,8 +519,8 @@ void registerOutputFormatBSONEachRow(FormatFactory & factory)
{
factory.registerOutputFormat(
"BSONEachRow",
[](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams & params, const FormatSettings & _format_settings)
{ return std::make_shared<BSONEachRowRowOutputFormat>(buf, sample, params, _format_settings); });
[](WriteBuffer & buf, const Block & sample, const FormatSettings & _format_settings)
{ return std::make_shared<BSONEachRowRowOutputFormat>(buf, sample, _format_settings); });
factory.markOutputFormatSupportsParallelFormatting("BSONEachRow");
}

View File

@ -47,7 +47,7 @@ class BSONEachRowRowOutputFormat final : public IRowOutputFormat
{
public:
BSONEachRowRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_);
WriteBuffer & out_, const Block & header_, const FormatSettings & settings_);
String getName() const override { return "BSONEachRowRowOutputFormat"; }

View File

@ -10,8 +10,8 @@
namespace DB
{
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header, out_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: IRowOutputFormat(header, out_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{
}
@ -55,10 +55,9 @@ void registerOutputFormatRowBinary(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types, params, format_settings);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};

View File

@ -17,7 +17,7 @@ class WriteBuffer;
class BinaryRowOutputFormat final: public IRowOutputFormat
{
public:
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
String getName() const override { return "BinaryRowOutputFormat"; }

View File

@ -9,8 +9,8 @@ namespace DB
{
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
@ -24,11 +24,10 @@ void CSVRowOutputFormat::writeLine(const std::vector<String> & values)
for (size_t i = 0; i < values.size(); ++i)
{
writeCSVString(values[i], out);
if (i + 1 == values.size())
writeRowEndDelimiter();
else
if (i + 1 != values.size())
writeFieldDelimiter();
}
writeRowEndDelimiter();
}
void CSVRowOutputFormat::writePrefix()
@ -36,10 +35,16 @@ void CSVRowOutputFormat::writePrefix()
const auto & sample = getPort(PortKind::Main).getHeader();
if (with_names)
{
writeLine(sample.getNames());
writeRowBetweenDelimiter();
}
if (with_types)
{
writeLine(sample.getDataTypeNames());
writeRowBetweenDelimiter();
}
}
@ -55,21 +60,38 @@ void CSVRowOutputFormat::writeFieldDelimiter()
}
void CSVRowOutputFormat::writeRowEndDelimiter()
void CSVRowOutputFormat::writeRowBetweenDelimiter()
{
if (format_settings.csv.crlf_end_of_line)
writeChar('\r', out);
writeChar('\n', out);
}
void CSVRowOutputFormat::writeSuffix()
{
/// Write '\n' after data if we had any data.
if (haveWrittenData())
writeRowBetweenDelimiter();
}
void CSVRowOutputFormat::writeBeforeTotals()
{
writeChar('\n', out);
writeRowBetweenDelimiter();
}
void CSVRowOutputFormat::writeBeforeExtremes()
{
writeChar('\n', out);
writeRowBetweenDelimiter();
}
void CSVRowOutputFormat::writeAfterTotals()
{
writeRowBetweenDelimiter();
}
void CSVRowOutputFormat::writeAfterExtremes()
{
writeRowBetweenDelimiter();
}
@ -80,10 +102,9 @@ void registerOutputFormatCSV(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, with_types, params, format_settings);
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, with_types, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};

View File

@ -20,7 +20,7 @@ public:
/** with_names - output in the first line a header with column names
* with_types - output in the next line header with the names of the types
*/
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types, const FormatSettings & format_settings_);
String getName() const override { return "CSVRowOutputFormat"; }
@ -33,15 +33,18 @@ public:
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
bool supportTotals() const override { return true; }
bool supportExtremes() const override { return true; }
void writeBeforeTotals() override;
void writeAfterTotals() override;
void writeBeforeExtremes() override;
void writeAfterExtremes() override;
void writePrefix() override;
void writeSuffix() override;
void writeLine(const std::vector<String> & values);
bool with_names;

View File

@ -42,10 +42,9 @@ void CapnProtoOutputStream::write(const void * buffer, size_t size)
CapnProtoRowOutputFormat::CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
: IRowOutputFormat(header_, out_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
{
schema = schema_parser.getMessageSchema(info);
checkCapnProtoSchemaStructure(schema, getPort(PortKind::Main).getHeader(), format_settings.capn_proto.enum_comparing_mode);
@ -264,10 +263,9 @@ void registerOutputFormatCapnProto(FormatFactory & factory)
factory.registerOutputFormat("CapnProto", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, params, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
});
}

View File

@ -29,7 +29,6 @@ public:
CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_);

View File

@ -88,8 +88,7 @@ void CustomSeparatedRowInputFormat::syncAfterError()
void CustomSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}
CustomSeparatedFormatReader::CustomSeparatedFormatReader(

View File

@ -8,8 +8,8 @@ namespace DB
{
CustomSeparatedRowOutputFormat::CustomSeparatedRowOutputFormat(
const Block & header_, WriteBuffer & out_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_, bool with_names_, bool with_types_)
: IRowOutputFormat(header_, out_, params_)
const Block & header_, WriteBuffer & out_, const FormatSettings & format_settings_, bool with_names_, bool with_types_)
: IRowOutputFormat(header_, out_)
, with_names(with_names_)
, with_types(with_types_)
, format_settings(format_settings_)
@ -84,10 +84,9 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory)
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<CustomSeparatedRowOutputFormat>(sample, buf, params, settings, with_names, with_types);
return std::make_shared<CustomSeparatedRowOutputFormat>(sample, buf, settings, with_names, with_types);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);

View File

@ -11,7 +11,7 @@ class WriteBuffer;
class CustomSeparatedRowOutputFormat final : public IRowOutputFormat
{
public:
CustomSeparatedRowOutputFormat(const Block & header_, WriteBuffer & out_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_, bool with_names_, bool with_types_);
CustomSeparatedRowOutputFormat(const Block & header_, WriteBuffer & out_, const FormatSettings & format_settings_, bool with_names_, bool with_types_);
String getName() const override { return "CustomSeparatedRowOutputFormat"; }

View File

@ -1,4 +1,5 @@
#include <Processors/Formats/Impl/HiveTextRowInputFormat.h>
#include <Common/assert_cast.h>
#if USE_HIVE
@ -31,12 +32,17 @@ HiveTextRowInputFormat::HiveTextRowInputFormat(
HiveTextRowInputFormat::HiveTextRowInputFormat(
const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_, const FormatSettings & format_settings_)
: CSVRowInputFormat(
header_, *buf_, params_, true, false, format_settings_, std::make_unique<HiveTextFormatReader>(std::move(buf_), format_settings_))
header_, *buf_, params_, true, false, format_settings_, std::make_unique<HiveTextFormatReader>(*buf_, format_settings_)), buf(std::move(buf_))
{
}
HiveTextFormatReader::HiveTextFormatReader(std::unique_ptr<PeekableReadBuffer> buf_, const FormatSettings & format_settings_)
: CSVFormatReader(*buf_, format_settings_), buf(std::move(buf_)), input_field_names(format_settings_.hive_text.input_field_names)
void HiveTextRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf->setSubBuffer(in_);
}
HiveTextFormatReader::HiveTextFormatReader(PeekableReadBuffer & buf_, const FormatSettings & format_settings_)
: CSVFormatReader(buf_, format_settings_), buf(&buf_), input_field_names(format_settings_.hive_text.input_field_names)
{
}
@ -53,6 +59,12 @@ std::vector<String> HiveTextFormatReader::readTypes()
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "HiveTextRowInputFormat::readTypes is not implemented");
}
void HiveTextFormatReader::setReadBuffer(ReadBuffer & buf_)
{
buf = assert_cast<PeekableReadBuffer *>(&buf_);
CSVFormatReader::setReadBuffer(buf_);
}
void registerInputFormatHiveText(FormatFactory & factory)
{
factory.registerInputFormat(

View File

@ -18,21 +18,27 @@ public:
String getName() const override { return "HiveTextRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
HiveTextRowInputFormat(
const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_, const FormatSettings & format_settings_);
std::unique_ptr<PeekableReadBuffer> buf;
};
class HiveTextFormatReader final : public CSVFormatReader
{
public:
HiveTextFormatReader(std::unique_ptr<PeekableReadBuffer> buf_, const FormatSettings & format_settings_);
HiveTextFormatReader(PeekableReadBuffer & buf_, const FormatSettings & format_settings_);
std::vector<String> readNames() override;
std::vector<String> readTypes() override;
void setReadBuffer(ReadBuffer & buf_) override;
private:
std::unique_ptr<PeekableReadBuffer> buf;
PeekableReadBuffer * buf;
std::vector<String> input_field_names;
};

View File

@ -98,8 +98,7 @@ bool JSONAsRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}

View File

@ -34,7 +34,6 @@ void registerOutputFormatJSONColumns(FormatFactory & factory)
factory.registerOutputFormat("JSONColumns", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONColumnsBlockOutputFormat>(buf, sample, format_settings, format_settings.json.validate_utf8);

View File

@ -28,7 +28,6 @@ void JSONColumnsBlockOutputFormatBase::consume(Chunk chunk)
void JSONColumnsBlockOutputFormatBase::writeSuffix()
{
writeChunk(mono_chunk);
mono_chunk.clear();
}

View File

@ -91,12 +91,18 @@ void JSONColumnsWithMetadataBlockOutputFormat::finalizeImpl()
ostr->next();
}
void JSONColumnsWithMetadataBlockOutputFormat::resetFormatterImpl()
{
JSONColumnsBlockOutputFormat::resetFormatterImpl();
rows = 0;
statistics = Statistics();
}
void registerOutputFormatJSONColumnsWithMetadata(FormatFactory & factory)
{
factory.registerOutputFormat("JSONColumnsWithMetadata", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONColumnsWithMetadataBlockOutputFormat>(buf, sample, format_settings);

View File

@ -53,6 +53,7 @@ protected:
void writePrefix() override;
void writeSuffix() override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void writeChunkStart() override;
void writeChunkEnd() override;

View File

@ -32,7 +32,6 @@ void registerOutputFormatJSONCompactColumns(FormatFactory & factory)
factory.registerOutputFormat("JSONCompactColumns", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactColumnsBlockOutputFormat>(buf, sample, format_settings);

View File

@ -1,7 +1,6 @@
#include <Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h>
#include <IO/ReadHelpers.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/Operators.h>
#include <Formats/FormatFactory.h>
#include <Formats/verbosePrintString.h>

View File

@ -11,12 +11,11 @@ namespace DB
JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool with_names_,
bool with_types_,
bool yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_, params_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_)
, settings(settings_)
, with_names(with_names_)
, with_types(with_types_)
@ -53,12 +52,17 @@ void JSONCompactEachRowRowOutputFormat::writeRowStartDelimiter()
void JSONCompactEachRowRowOutputFormat::writeRowEndDelimiter()
{
writeCString("]\n", *ostr);
writeChar(']', *ostr);
}
void JSONCompactEachRowRowOutputFormat::writeRowBetweenDelimiter()
{
writeChar('\n', *ostr);
}
void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
writeChar('\n', *ostr);
writeRowBetweenDelimiter();
size_t columns_size = columns.size();
writeRowStartDelimiter();
for (size_t i = 0; i < columns_size; ++i)
@ -69,6 +73,7 @@ void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, siz
writeField(*columns[i], *serializations[i], row_num);
}
writeRowEndDelimiter();
writeRowBetweenDelimiter();
}
void JSONCompactEachRowRowOutputFormat::writeLine(const std::vector<String> & values)
@ -80,7 +85,7 @@ void JSONCompactEachRowRowOutputFormat::writeLine(const std::vector<String> & va
writeChar('\"', *ostr);
writeString(values[i], *ostr);
writeChar('\"', *ostr);
if (i != values.size() - 1)
if (i + 1 != values.size())
writeFieldDelimiter();
}
writeRowEndDelimiter();
@ -91,10 +96,22 @@ void JSONCompactEachRowRowOutputFormat::writePrefix()
const auto & header = getPort(PortKind::Main).getHeader();
if (with_names)
{
writeLine(JSONUtils::makeNamesValidJSONStrings(header.getNames(), settings, settings.json.validate_utf8));
writeRowBetweenDelimiter();
}
if (with_types)
{
writeLine(JSONUtils::makeNamesValidJSONStrings(header.getDataTypeNames(), settings, settings.json.validate_utf8));
writeRowBetweenDelimiter();
}
}
void JSONCompactEachRowRowOutputFormat::writeSuffix()
{
if (haveWrittenData())
writeChar('\n', *ostr);
}
void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
@ -112,10 +129,9 @@ void registerOutputFormatJSONCompactEachRow(FormatFactory & factory)
factory.registerOutputFormat(format_name, [yield_strings, with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, with_names, with_types, yield_strings);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, format_settings, with_names, with_types, yield_strings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);

View File

@ -17,7 +17,6 @@ public:
JSONCompactEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool with_names_,
bool with_types_,
@ -27,6 +26,7 @@ public:
private:
void writePrefix() override;
void writeSuffix() override;
void writeTotals(const Columns & columns, size_t row_num) override;
@ -34,6 +34,7 @@ private:
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
bool supportTotals() const override { return true; }
void consumeTotals(Chunk) override;

View File

@ -11,10 +11,9 @@ namespace DB
JSONCompactRowOutputFormat::JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: JSONRowOutputFormat(out_, header, params_, settings_, yield_strings_)
: JSONRowOutputFormat(out_, header, settings_, yield_strings_)
{
}
@ -72,10 +71,9 @@ void registerOutputFormatJSONCompact(FormatFactory & factory)
factory.registerOutputFormat("JSONCompact", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, false);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompact");
@ -83,10 +81,9 @@ void registerOutputFormatJSONCompact(FormatFactory & factory)
factory.registerOutputFormat("JSONCompactStrings", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, true);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactStrings");

View File

@ -19,7 +19,6 @@ public:
JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);

View File

@ -278,6 +278,7 @@ void JSONEachRowRowInputFormat::resetParser()
read_columns.clear();
seen_columns.clear();
prev_positions.clear();
allow_new_rows = true;
}
void JSONEachRowRowInputFormat::readPrefix()

View File

@ -12,9 +12,8 @@ namespace DB
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_, params_),
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_),
settings(settings_)
{
fields = JSONUtils::makeNamesValidJSONStrings(getPort(PortKind::Main).getHeader().getNames(), settings, settings.json.validate_utf8);
@ -42,49 +41,17 @@ void JSONEachRowRowOutputFormat::writeRowStartDelimiter()
void JSONEachRowRowOutputFormat::writeRowEndDelimiter()
{
// Why do we need this weird `if`?
//
// The reason is the formatRow function that is broken with respect to
// row-between delimiters. It should not write them, but it does, and then
// hacks around it by having a special formatRowNoNewline version, which, as
// you guessed, removes the newline from the end of row. But the row-between
// delimiter goes into a second row, so it turns out to be in the beginning
// of the line, and the removal doesn't work. There is also a second bug --
// the row-between delimiter in this format is written incorrectly. In fact,
// it is not written at all, and the newline is written in a row-end
// delimiter ("}\n" instead of the correct "}"). With these two bugs
// combined, the test 01420_format_row works perfectly.
//
// A proper implementation of formatRow would use IRowOutputFormat directly,
// and not write row-between delimiters, instead of using IOutputFormat
// processor and its crutch row callback. This would require exposing
// IRowOutputFormat, which we don't do now, but which can be generally useful
// for other cases such as parallel formatting, that also require a control
// flow different from the usual IOutputFormat.
//
// I just don't have time or energy to redo all of this, but I need to
// support JSON array output here, which requires proper ",\n" row-between
// delimiters. For compatibility, I preserve the bug in case of non-array
// output.
if (settings.json.array_of_rows)
{
writeChar('}', *ostr);
}
else
{
writeCString("}\n", *ostr);
}
writeCString("}", *ostr);
field_number = 0;
}
void JSONEachRowRowOutputFormat::writeRowBetweenDelimiter()
{
// We preserve an existing bug here for compatibility. See the comment above.
if (settings.json.array_of_rows)
{
writeCString(",\n", *ostr);
}
writeChar(',', *ostr);
writeChar('\n', *ostr);
}
@ -100,9 +67,9 @@ void JSONEachRowRowOutputFormat::writePrefix()
void JSONEachRowRowOutputFormat::writeSuffix()
{
if (settings.json.array_of_rows)
{
writeCString("\n]\n", *ostr);
}
else if (haveWrittenData())
writeChar('\n', *ostr);
}
@ -113,13 +80,11 @@ void registerOutputFormatJSONEachRow(FormatFactory & factory)
factory.registerOutputFormat(format, [serialize_as_strings](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = serialize_as_strings;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting(format);
};

View File

@ -17,7 +17,6 @@ public:
JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_);
String getName() const override { return "JSONEachRowRowOutputFormat"; }

View File

@ -10,13 +10,16 @@ namespace DB
void JSONEachRowWithProgressRowOutputFormat::writeRowStartDelimiter()
{
if (has_progress)
{
writeProgress();
writeRowBetweenDelimiter();
}
writeCString("{\"row\":{", *ostr);
}
void JSONEachRowWithProgressRowOutputFormat::writeRowEndDelimiter()
{
writeCString("}}\n", *ostr);
writeCString("}}", *ostr);
field_number = 0;
}
@ -27,7 +30,7 @@ void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
WriteBufferFromString buf(progress_line);
writeCString("{\"progress\":", buf);
progress.writeJSON(buf);
writeCString("}\n", buf);
writeCString("}", buf);
buf.finalize();
std::lock_guard lock(progress_lines_mutex);
progress_lines.emplace_back(std::move(progress_line));
@ -37,22 +40,33 @@ void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
void JSONEachRowWithProgressRowOutputFormat::flush()
{
if (has_progress)
{
if (haveWrittenData())
writeRowBetweenDelimiter();
writeProgress();
}
JSONEachRowRowOutputFormat::flush();
}
void JSONEachRowWithProgressRowOutputFormat::writeSuffix()
{
if (has_progress)
{
writeRowBetweenDelimiter();
writeProgress();
}
JSONEachRowRowOutputFormat::writeSuffix();
}
void JSONEachRowWithProgressRowOutputFormat::writeProgress()
{
std::lock_guard lock(progress_lines_mutex);
for (const auto & progress_line : progress_lines)
writeString(progress_line, *ostr);
for (size_t i = 0; i != progress_lines.size(); ++i)
{
if (i != 0)
writeRowBetweenDelimiter();
writeString(progress_lines[i], *ostr);
}
progress_lines.clear();
has_progress = false;
}
@ -62,25 +76,21 @@ void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
factory.registerOutputFormat("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, settings);
});
factory.registerOutputFormat("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = true;
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, settings);
});
}

View File

@ -6,8 +6,8 @@
namespace DB
{
JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: JSONEachRowRowOutputFormat(out_, header_, params_, settings_), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, settings_))
JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: JSONEachRowRowOutputFormat(out_, header_, settings_), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, settings_))
{
}
@ -71,12 +71,11 @@ void registerOutputFormatJSONObjectEachRow(FormatFactory & factory)
factory.registerOutputFormat("JSONObjectEachRow", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONObjectEachRowRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<JSONObjectEachRowRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONObjectEachRow");
factory.markFormatHasNoAppendSupport("JSONObjectEachRow");

View File

@ -23,7 +23,6 @@ public:
JSONObjectEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_);
String getName() const override { return "JSONObjectEachRowRowOutputFormat"; }

View File

@ -11,10 +11,9 @@ namespace DB
JSONRowOutputFormat::JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header, out_, params_), settings(settings_), yield_strings(yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header, out_), settings(settings_), yield_strings(yield_strings_)
{
names = JSONUtils::makeNamesValidJSONStrings(header.getNames(), settings, true);
}
@ -126,6 +125,13 @@ void JSONRowOutputFormat::finalizeImpl()
ostr->next();
}
void JSONRowOutputFormat::resetFormatterImpl()
{
RowOutputFormatWithUTF8ValidationAdaptor::resetFormatterImpl();
row_count = 0;
statistics = Statistics();
}
void JSONRowOutputFormat::onProgress(const Progress & value)
{
@ -138,10 +144,9 @@ void registerOutputFormatJSON(FormatFactory & factory)
factory.registerOutputFormat("JSON", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
return std::make_shared<JSONRowOutputFormat>(buf, sample, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSON");
@ -150,10 +155,9 @@ void registerOutputFormatJSON(FormatFactory & factory)
factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, true);
return std::make_shared<JSONRowOutputFormat>(buf, sample, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");

View File

@ -19,7 +19,6 @@ public:
JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);
@ -57,6 +56,7 @@ protected:
void writeAfterExtremes() override;
void finalizeImpl() override;
void resetFormatterImpl() override;
virtual void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);

View File

@ -5,8 +5,8 @@
namespace DB
{
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_) {}
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_) {}
void MarkdownRowOutputFormat::writePrefix()
{
@ -47,7 +47,18 @@ void MarkdownRowOutputFormat::writeFieldDelimiter()
void MarkdownRowOutputFormat::writeRowEndDelimiter()
{
writeCString(" |\n", out);
writeCString(" |", out);
}
void MarkdownRowOutputFormat::writeRowBetweenDelimiter()
{
writeChar('\n', out);
}
void MarkdownRowOutputFormat::writeSuffix()
{
if (haveWrittenData())
writeChar('\n', out);
}
void MarkdownRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
@ -60,10 +71,9 @@ void registerOutputFormatMarkdown(FormatFactory & factory)
factory.registerOutputFormat("Markdown", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Markdown");

View File

@ -12,7 +12,7 @@ class ReadBuffer;
class MarkdownRowOutputFormat final : public IRowOutputFormat
{
public:
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "MarkdownRowOutputFormat"; }
@ -21,6 +21,7 @@ private:
/// |columnName1|columnName2|...|columnNameN|
/// |:-:|:-:|...|:-:|
void writePrefix() override;
void writeSuffix() override;
/// Write '|' before each row
void writeRowStartDelimiter() override;
@ -28,8 +29,11 @@ private:
/// Write '|' between values
void writeFieldDelimiter() override;
/// Write '|\n' after each row
void writeRowEndDelimiter() override ;
/// Write '|' at the end of each row
void writeRowEndDelimiter() override;
/// Write '\n' after each row
void writeRowBetweenDelimiter() override;
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;

View File

@ -429,8 +429,7 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(in_);
buf->setSubBuffer(in_);
}
MsgPackSchemaReader::MsgPackSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)

View File

@ -32,8 +32,8 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), packer(out_), format_settings(format_settings_) {}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), packer(out_), format_settings(format_settings_) {}
void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num)
{
@ -226,10 +226,9 @@ void registerOutputFormatMsgPack(FormatFactory & factory)
factory.registerOutputFormat("MsgPack", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("MsgPack");
}

View File

@ -17,7 +17,7 @@ namespace DB
class MsgPackRowOutputFormat final : public IRowOutputFormat
{
public:
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "MsgPackRowOutputFormat"; }

View File

@ -109,7 +109,6 @@ void registerOutputFormatMySQLWire(FormatFactory & factory)
"MySQLWire",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
}

View File

@ -115,7 +115,6 @@ void registerOutputFormatNative(FormatFactory & factory)
factory.registerOutputFormat("Native", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NativeOutputFormat>(buf, sample);

View File

@ -13,7 +13,6 @@ void registerOutputFormatNull(FormatFactory & factory)
factory.registerOutputFormat("Null", [](
WriteBuffer &,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NullOutputFormat>(sample);

View File

@ -100,7 +100,7 @@ void ODBCDriver2BlockOutputFormat::writePrefix()
void registerOutputFormatODBCDriver2(FormatFactory & factory)
{
factory.registerOutputFormat(
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams &, const FormatSettings & format_settings)
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const FormatSettings & format_settings)
{
return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
});

View File

@ -515,6 +515,11 @@ void ORCBlockOutputFormat::finalizeImpl()
writer->close();
}
void ORCBlockOutputFormat::resetFormatterImpl()
{
writer.reset();
}
void ORCBlockOutputFormat::prepareWriter()
{
const Block & header = getPort(PortKind::Main).getHeader();
@ -531,7 +536,6 @@ void registerOutputFormatORC(FormatFactory & factory)
factory.registerOutputFormat("ORC", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);

View File

@ -44,6 +44,7 @@ public:
private:
void consume(Chunk chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
std::unique_ptr<orc::Type> getORCType(const DataTypePtr & type);

View File

@ -20,6 +20,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/**
* ORDER-PRESERVING parallel formatting of data formats.
* The idea is similar to ParallelParsingInputFormat.
@ -167,6 +172,12 @@ private:
void finalizeImpl() override;
void resetFormatterImpl() override
{
/// Resetting parallel formatting is not obvious and it's not used anywhere
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method resetFormatterImpl is not implemented for parallel formatting");
}
InternalFormatterCreator internal_formatter_creator;
/// Status to synchronize multiple threads.

View File

@ -74,13 +74,17 @@ void ParquetBlockOutputFormat::finalizeImpl()
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
void ParquetBlockOutputFormat::resetFormatterImpl()
{
file_writer.reset();
}
void registerOutputFormatParquet(FormatFactory & factory)
{
factory.registerOutputFormat(
"Parquet",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);

View File

@ -36,6 +36,7 @@ public:
private:
void consume(Chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
const FormatSettings format_settings;

View File

@ -66,7 +66,6 @@ void registerOutputFormatPostgreSQLWire(FormatFactory & factory)
"PostgreSQLWire",
[](WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<PostgreSQLOutputFormat>(buf, sample, settings); });
}
}

View File

@ -54,6 +54,11 @@ protected:
const IColumn & column, const ISerialization & serialization, size_t row_num,
size_t value_width, size_t pad_to_width, bool align_right);
void resetFormatterImpl() override
{
total_rows = 0;
}
private:
bool mono_block;
/// For mono_block == true only
@ -68,7 +73,6 @@ void registerPrettyFormatWithNoEscapesAndMonoBlock(FormatFactory & factory, cons
fact.registerOutputFormat(name, [no_escapes, mono_block](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
if (no_escapes)

View File

@ -82,9 +82,8 @@ static Float64 tryParseFloat(const String & s)
PrometheusTextOutputFormat::PrometheusTextOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_)
: IRowOutputFormat(header_, out_)
, string_serialization(DataTypeString().getDefaultSerialization())
, format_settings(format_settings_)
{
@ -339,10 +338,9 @@ void registerOutputFormatPrometheus(FormatFactory & factory)
factory.registerOutputFormat(FORMAT_NAME, [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<PrometheusTextOutputFormat>(buf, sample, params, settings);
return std::make_shared<PrometheusTextOutputFormat>(buf, sample, settings);
});
}

View File

@ -20,7 +20,6 @@ public:
PrometheusTextOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_);
String getName() const override { return "PrometheusTextOutputFormat"; }

View File

@ -30,6 +30,12 @@ ProtobufListInputFormat::ProtobufListInputFormat(
{
}
void ProtobufListInputFormat::setReadBuffer(ReadBuffer & in_)
{
reader->setReadBuffer(in_);
IRowInputFormat::setReadBuffer(in_);
}
bool ProtobufListInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension)
{
if (reader->eof())

View File

@ -33,6 +33,8 @@ public:
String getName() const override { return "ProtobufListInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override;

View File

@ -13,10 +13,9 @@ namespace DB
ProtobufListOutputFormat::ProtobufListOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_)
: IRowOutputFormat(header_, out_, params_)
: IRowOutputFormat(header_, out_)
, writer(std::make_unique<ProtobufWriter>(out))
, serializer(ProtobufSerializer::create(
header_.getNames(),
@ -42,18 +41,21 @@ void ProtobufListOutputFormat::finalizeImpl()
serializer->finalizeWrite();
}
void ProtobufListOutputFormat::resetFormatterImpl()
{
serializer->reset();
}
void registerOutputFormatProtobufList(FormatFactory & factory)
{
factory.registerOutputFormat(
"ProtobufList",
[](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufListOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings, "Protobuf", true),
buf, header, FormatSchemaInfo(settings, "Protobuf", true),
settings.protobuf.output_nullables_with_google_wrappers);
});
}

View File

@ -26,7 +26,6 @@ public:
ProtobufListOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
bool defaults_for_nullable_google_wrappers_);
@ -39,6 +38,7 @@ private:
void writeField(const IColumn &, const ISerialization &, size_t) override {}
void finalizeImpl() override;
void resetFormatterImpl() override;
std::unique_ptr<ProtobufWriter> writer;
std::unique_ptr<ProtobufSerializer> serializer;

View File

@ -44,6 +44,12 @@ bool ProtobufRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
return true;
}
void ProtobufRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
reader->setReadBuffer(in_);
IRowInputFormat::setReadBuffer(in_);
}
bool ProtobufRowInputFormat::allowSyncAfterError() const
{
return true;

View File

@ -38,6 +38,8 @@ public:
String getName() const override { return "ProtobufRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override;
bool allowSyncAfterError() const override;

View File

@ -20,11 +20,10 @@ namespace ErrorCodes
ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
const FormatSettings & settings_,
bool with_length_delimiter_)
: IRowOutputFormat(header_, out_, params_)
: IRowOutputFormat(header_, out_)
, writer(std::make_unique<ProtobufWriter>(out))
, serializer(ProtobufSerializer::create(
header_.getNames(),
@ -59,14 +58,11 @@ void registerOutputFormatProtobuf(FormatFactory & factory)
with_length_delimiter ? "Protobuf" : "ProtobufSingle",
[with_length_delimiter](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings, "Protobuf", true),
settings,
with_length_delimiter);
buf, header, FormatSchemaInfo(settings, "Protobuf", true),
settings, with_length_delimiter);
});
}
}

View File

@ -30,7 +30,6 @@ public:
ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & schema_info_,
const FormatSettings & settings_,
bool with_length_delimiter_);

View File

@ -8,9 +8,8 @@ namespace DB
RawBLOBRowOutputFormat::RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_)
: IRowOutputFormat(header_, out_, params_)
const Block & header_)
: IRowOutputFormat(header_, out_)
{
}
@ -30,10 +29,9 @@ void registerOutputFormatRawBLOB(FormatFactory & factory)
factory.registerOutputFormat("RawBLOB", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample, params);
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample);
});
}

View File

@ -29,8 +29,7 @@ class RawBLOBRowOutputFormat final : public IRowOutputFormat
public:
RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_);
const Block & header_);
String getName() const override { return "RawBLOBRowOutputFormat"; }

View File

@ -130,8 +130,7 @@ bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}
RegexpSchemaReader::RegexpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)

View File

@ -5,8 +5,8 @@
namespace DB
{
SQLInsertRowOutputFormat::SQLInsertRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), column_names(header_.getNames()), format_settings(format_settings_)
SQLInsertRowOutputFormat::SQLInsertRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), column_names(header_.getNames()), format_settings(format_settings_)
{
}
@ -65,36 +65,42 @@ void SQLInsertRowOutputFormat::writeRowEndDelimiter()
{
writeChar(')', out);
++rows_in_line;
if (rows_in_line >= format_settings.sql_insert.max_batch_size)
{
writeChar(';', out);
rows_in_line = 0;
}
}
void SQLInsertRowOutputFormat::writeRowBetweenDelimiter()
{
if (rows_in_line >= format_settings.sql_insert.max_batch_size)
{
writeCString(";\n", out);
rows_in_line = 0;
}
if (rows_in_line == 0)
writeChar('\n', out);
else
{
writeCString(", ", out);
}
}
void SQLInsertRowOutputFormat::writeSuffix()
{
writeCString(";\n", out);
if (rows_in_line != 0)
writeChar(';', out);
if (haveWrittenData())
writeChar('\n', out);
}
void SQLInsertRowOutputFormat::resetFormatterImpl()
{
rows_in_line = 0;
}
void registerOutputFormatSQLInsert(FormatFactory & factory)
{
factory.registerOutputFormat("SQLInsert", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<SQLInsertRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<SQLInsertRowOutputFormat>(buf, sample, settings);
});
}

View File

@ -16,7 +16,6 @@ public:
SQLInsertRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_);
String getName() const override { return "SQLInsertRowOutputFormat"; }
@ -26,11 +25,12 @@ public:
protected:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
virtual void writeFieldDelimiter() override;
virtual void writeRowStartDelimiter() override;
virtual void writeRowEndDelimiter() override;
virtual void writeRowBetweenDelimiter() override;
virtual void writeSuffix() override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
void writeSuffix() override;
void resetFormatterImpl() override;
void printLineStart();
void printColumnNames();

View File

@ -7,8 +7,8 @@
namespace DB
{
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, false, params_, format_settings_), fields(header.getNamesAndTypes())
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, false, format_settings_), fields(header.getNamesAndTypes())
{
for (auto & field : fields)
{
@ -30,20 +30,24 @@ void TSKVRowOutputFormat::writeField(const IColumn & column, const ISerializatio
void TSKVRowOutputFormat::writeRowEndDelimiter()
{
writeChar('\n', out);
field_number = 0;
}
void TSKVRowOutputFormat::writeRowBetweenDelimiter()
{
writeChar('\n', out);
}
void registerOutputFormatTSKV(FormatFactory & factory)
{
factory.registerOutputFormat("TSKV", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TSKVRowOutputFormat>(buf, sample, params, settings);
return std::make_shared<TSKVRowOutputFormat>(buf, sample, settings);
});
factory.markOutputFormatSupportsParallelFormatting("TSKV");
}

View File

@ -14,13 +14,14 @@ namespace DB
class TSKVRowOutputFormat final : public TabSeparatedRowOutputFormat
{
public:
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings);
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings);
String getName() const override { return "TSKVRowOutputFormat"; }
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
/// Disable totals and extremes, because they are enabled in TSV.
bool supportTotals() const override { return false; }

View File

@ -12,9 +12,8 @@ TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat(
bool with_names_,
bool with_types_,
bool is_raw_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), with_types(with_types_), is_raw(is_raw_), format_settings(format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), with_types(with_types_), is_raw(is_raw_), format_settings(format_settings_)
{
}
@ -26,11 +25,10 @@ void TabSeparatedRowOutputFormat::writeLine(const std::vector<String> & values)
writeString(values[i], out);
else
writeEscapedString(values[i], out);
if (i + 1 == values.size())
writeRowEndDelimiter();
else
if (i + 1 != values.size())
writeFieldDelimiter();
}
writeRowEndDelimiter();
}
void TabSeparatedRowOutputFormat::writePrefix()
@ -38,10 +36,16 @@ void TabSeparatedRowOutputFormat::writePrefix()
const auto & header = getPort(PortKind::Main).getHeader();
if (with_names)
{
writeLine(header.getNames());
writeRowBetweenDelimiter();
}
if (with_types)
{
writeLine(header.getDataTypeNames());
writeRowBetweenDelimiter();
}
}
@ -60,21 +64,38 @@ void TabSeparatedRowOutputFormat::writeFieldDelimiter()
}
void TabSeparatedRowOutputFormat::writeRowEndDelimiter()
void TabSeparatedRowOutputFormat::writeRowBetweenDelimiter()
{
if (format_settings.tsv.crlf_end_of_line)
writeChar('\r', out);
writeChar('\n', out);
}
void TabSeparatedRowOutputFormat::writeSuffix()
{
/// Output '\n' an the end of data if we had any data.
if (haveWrittenData())
writeRowBetweenDelimiter();
}
void TabSeparatedRowOutputFormat::writeBeforeTotals()
{
writeChar('\n', out);
writeRowBetweenDelimiter();
}
void TabSeparatedRowOutputFormat::writeBeforeExtremes()
{
writeChar('\n', out);
writeRowBetweenDelimiter();
}
void TabSeparatedRowOutputFormat::writeAfterTotals()
{
writeRowBetweenDelimiter();
}
void TabSeparatedRowOutputFormat::writeAfterExtremes()
{
writeRowBetweenDelimiter();
}
void registerOutputFormatTabSeparated(FormatFactory & factory)
@ -86,10 +107,9 @@ void registerOutputFormatTabSeparated(FormatFactory & factory)
factory.registerOutputFormat(format_name, [is_raw, with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, with_names, with_types, is_raw, params, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, with_names, with_types, is_raw, settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);

View File

@ -25,7 +25,6 @@ public:
bool with_names_,
bool with_types_,
bool is_raw_,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_);
String getName() const override { return "TabSeparatedRowOutputFormat"; }
@ -36,15 +35,18 @@ public:
protected:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override final;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
bool supportTotals() const override { return true; }
bool supportExtremes() const override { return true; }
void writeBeforeTotals() override final;
void writeAfterTotals() override final;
void writeBeforeExtremes() override final;
void writeAfterExtremes() override final;
void writePrefix() override;
void writeSuffix() override;
void writeLine(const std::vector<String> & values);
bool with_names;

View File

@ -133,9 +133,6 @@ void TemplateBlockOutputFormat::writePrefix()
void TemplateBlockOutputFormat::finalizeImpl()
{
if (finalized)
return;
size_t parts = format.format_idx_to_column_idx.size();
for (size_t i = 0; i < parts; ++i)
{
@ -180,17 +177,19 @@ void TemplateBlockOutputFormat::finalizeImpl()
}
writeString(format.delimiters[i + 1], out);
}
finalized = true;
}
void TemplateBlockOutputFormat::resetFormatterImpl()
{
row_count = 0;
statistics = Statistics();
}
void registerOutputFormatTemplate(FormatFactory & factory)
{
factory.registerOutputFormat("Template", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings)
{
ParsedTemplateFormatString resultset_format;

View File

@ -44,6 +44,7 @@ private:
void consumeTotals(Chunk chunk) override { statistics.totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { statistics.extremes = std::move(chunk); }
void finalizeImpl() override;
void resetFormatterImpl() override;
void writeRow(const Chunk & chunk, size_t row_num);
template <typename U, typename V> void writeValue(U value, EscapingRule escaping_rule);

Some files were not shown because too many files have changed in this diff Show More