diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp index f295fe00299..7a3de9dca58 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp @@ -147,6 +147,13 @@ void ParallelParsingInputFormat::onBackgroundException(size_t offset) Chunk ParallelParsingInputFormat::generate() { + /// Delayed launching of segmenting thread + if (!parsing_started.exchange(true)) + { + segmentator_thread = ThreadFromGlobalPool( + &ParallelParsingInputFormat::segmentatorThreadFunction, this, CurrentThread::getGroup()); + } + if (isCancelled() || parsing_finished) { /** diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h index 559507055b9..dafaf9bed72 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h @@ -97,9 +97,6 @@ public: // bump into reader thread on wraparound. processing_units.resize(params.max_threads + 2); - segmentator_thread = ThreadFromGlobalPool( - &ParallelParsingInputFormat::segmentatorThreadFunction, this, CurrentThread::getGroup()); - LOG_TRACE(&Poco::Logger::get("ParallelParsingInputFormat"), "Parallel parsing is used"); } @@ -205,6 +202,7 @@ private: Poco::Event first_parser_finished; + std::atomic parsing_started{false}; std::atomic parsing_finished{false}; /// There are multiple "parsers", that's why we use thread pool. diff --git a/tests/queries/0_stateless/01848_http_insert_segfault.reference b/tests/queries/0_stateless/01848_http_insert_segfault.reference new file mode 100644 index 00000000000..587579af915 --- /dev/null +++ b/tests/queries/0_stateless/01848_http_insert_segfault.reference @@ -0,0 +1 @@ +Ok. diff --git a/tests/queries/0_stateless/01848_http_insert_segfault.sh b/tests/queries/0_stateless/01848_http_insert_segfault.sh new file mode 100755 index 00000000000..a263ded44eb --- /dev/null +++ b/tests/queries/0_stateless/01848_http_insert_segfault.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + + CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) + # shellcheck source=../shell_config.sh + . "$CUR_DIR"/../shell_config.sh + + ${CLICKHOUSE_LOCAL} -q "select col1, initializeAggregation('argMaxState', col2, insertTime) as col2, now() as insertTime FROM generateRandom('col1 String, col2 Array(Float64)') LIMIT 1000000 FORMAT CSV" | curl -s 'http://localhost:8123/?query=INSERT%20INTO%20non_existing_table%20SELECT%20col1%2C%20initializeAggregation(%27argMaxState%27%2C%20col2%2C%20insertTime)%20as%20col2%2C%20now()%20as%20insertTime%20FROM%20input(%27col1%20String%2C%20col2%20Array(Float64)%27)%20FORMAT%20CSV' --data-binary @- | grep -q "Table default.non_existing_table doesn't exist" && echo 'Ok.' || echo 'FAIL' ||: +