From 369b4d53efb1b9085c8d8b1e8f481e4f9af9df31 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Sun, 26 Apr 2020 15:44:11 +0200 Subject: [PATCH] Adding support for `output_format_enable_streaming` format setting. --- src/Core/Settings.h | 1 + src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatSettings.h | 2 ++ src/Processors/Formats/Impl/CSVRowOutputFormat.cpp | 6 ++++++ src/Processors/Formats/Impl/CSVRowOutputFormat.h | 1 + .../0_stateless/01247_insert_into_watch_live_view_url.py | 3 +++ 6 files changed, 14 insertions(+) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 725171d4a1b..d45874e437f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -176,6 +176,7 @@ struct Settings : public SettingsCollection \ M(SettingString, count_distinct_implementation, "uniqExact", "What aggregate function to use for implementation of count(DISTINCT ...)", 0) \ \ + M(SettingBool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \ M(SettingBool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \ \ M(SettingBool, add_http_cors_header, false, "Write add http CORS header.", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 7d741004766..0034cab8600 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -87,6 +87,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context) { FormatSettings format_settings; + format_settings.enable_streaming = settings.output_format_enable_streaming; format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers; format_settings.json.quote_denormals = settings.output_format_json_quote_denormals; format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 1eb95ce2dbf..6c3c6dfc135 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -13,6 +13,8 @@ namespace DB */ struct FormatSettings { + bool enable_streaming = false; + struct JSON { bool quote_64bit_integers = true; diff --git a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp index e5ad8b007c1..f1e72e1c966 100644 --- a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp @@ -18,6 +18,12 @@ CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_ data_types[i] = sample.safeGetByPosition(i).type; } +void CSVRowOutputFormat::consume(DB::Chunk chunk) +{ + IRowOutputFormat::consume(std::move(chunk)); + if (format_settings.enable_streaming) + flush(); +} void CSVRowOutputFormat::writePrefix() { diff --git a/src/Processors/Formats/Impl/CSVRowOutputFormat.h b/src/Processors/Formats/Impl/CSVRowOutputFormat.h index 28bd8edf31a..3b5bd49bd8a 100644 --- a/src/Processors/Formats/Impl/CSVRowOutputFormat.h +++ b/src/Processors/Formats/Impl/CSVRowOutputFormat.h @@ -39,6 +39,7 @@ public: protected: + void consume(Chunk chunk) override; bool with_names; const FormatSettings format_settings; DataTypes data_types; diff --git a/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py index eaf336eb9a9..30729c49903 100755 --- a/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py +++ b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py @@ -24,6 +24,9 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo client1.send('SET allow_experimental_live_view = 1') client1.expect(prompt) + client1.send('SET output_format_enable_streaming = 1') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send('DROP TABLE IF EXISTS test.mt')