diff --git a/src/DataStreams/ParallelParsingBlockInputStream.cpp b/src/DataStreams/ParallelParsingBlockInputStream.cpp index 050a0d8ef8a..19b04d36fc1 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -17,7 +17,7 @@ ParallelParsingBlockInputStream::ParallelParsingBlockInputStream(const Params & // Subtract one thread that we use for segmentation and one for // reading. After that, must have at least two threads left for // parsing. See the assertion below. - pool(std::max(2, params.max_threads - 2)), + pool(std::max(2, static_cast(params.max_threads) - 2)), file_segmentation_engine(params.file_segmentation_engine) { // See comment above. diff --git a/src/DataStreams/ParallelParsingBlockInputStream.h b/src/DataStreams/ParallelParsingBlockInputStream.h index 4c110f8a937..c882acd9ddd 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.h +++ b/src/DataStreams/ParallelParsingBlockInputStream.h @@ -69,7 +69,7 @@ public: const InputProcessorCreator & input_processor_creator; const InputCreatorParams & input_creator_params; FormatFactory::FileSegmentationEngine file_segmentation_engine; - int max_threads; + size_t max_threads; size_t min_chunk_bytes; }; diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 4dc5b816420..728f9ae5a24 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -166,6 +166,9 @@ BlockInputStreamPtr FormatFactory::getInput( // (segmentator + two parsers + reader). bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4; + if (settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage) + parallel_parsing = false; + if (parallel_parsing && name == "JSONEachRow") { /// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix() @@ -195,7 +198,7 @@ BlockInputStreamPtr FormatFactory::getInput( auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings}; ParallelParsingBlockInputStream::Params params{buf, input_getter, input_creator_params, file_segmentation_engine, - static_cast(settings.max_threads), + settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing}; return std::make_shared(params); } diff --git a/tests/queries/0_stateless/01548_parallel_parsing_max_memory.reference b/tests/queries/0_stateless/01548_parallel_parsing_max_memory.reference new file mode 100644 index 00000000000..cf77cd33536 --- /dev/null +++ b/tests/queries/0_stateless/01548_parallel_parsing_max_memory.reference @@ -0,0 +1 @@ +19884108 diff --git a/tests/queries/0_stateless/01548_parallel_parsing_max_memory.sh b/tests/queries/0_stateless/01548_parallel_parsing_max_memory.sh new file mode 100755 index 00000000000..884d5b6e058 --- /dev/null +++ b/tests/queries/0_stateless/01548_parallel_parsing_max_memory.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../shell_config.sh + +yes http://foobarfoobarfoobarfoobarfoobarfoobarfoobar.com | head -c1G > 1g.csv + +$CLICKHOUSE_LOCAL --stacktrace --input_format_parallel_parsing=1 --max_memory_usage=100Mi -q "select count() from file('1g.csv', 'TSV', 'URL String')" \ No newline at end of file