This commit is contained in:
Nikita Mikhaylov 2021-04-26 18:50:40 +03:00
parent 478d540617
commit 9a70136bcf
4 changed files with 17 additions and 3 deletions

View File

@ -147,6 +147,13 @@ void ParallelParsingInputFormat::onBackgroundException(size_t offset)
Chunk ParallelParsingInputFormat::generate() Chunk ParallelParsingInputFormat::generate()
{ {
/// Delayed launching of segmenting thread
if (!parsing_started.exchange(true))
{
segmentator_thread = ThreadFromGlobalPool(
&ParallelParsingInputFormat::segmentatorThreadFunction, this, CurrentThread::getGroup());
}
if (isCancelled() || parsing_finished) if (isCancelled() || parsing_finished)
{ {
/** /**

View File

@ -97,9 +97,6 @@ public:
// bump into reader thread on wraparound. // bump into reader thread on wraparound.
processing_units.resize(params.max_threads + 2); processing_units.resize(params.max_threads + 2);
segmentator_thread = ThreadFromGlobalPool(
&ParallelParsingInputFormat::segmentatorThreadFunction, this, CurrentThread::getGroup());
LOG_TRACE(&Poco::Logger::get("ParallelParsingInputFormat"), "Parallel parsing is used"); LOG_TRACE(&Poco::Logger::get("ParallelParsingInputFormat"), "Parallel parsing is used");
} }
@ -205,6 +202,7 @@ private:
Poco::Event first_parser_finished; Poco::Event first_parser_finished;
std::atomic<bool> parsing_started{false};
std::atomic<bool> parsing_finished{false}; std::atomic<bool> parsing_finished{false};
/// There are multiple "parsers", that's why we use thread pool. /// There are multiple "parsers", that's why we use thread pool.

View File

@ -0,0 +1 @@
Ok.

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_LOCAL} -q "select col1, initializeAggregation('argMaxState', col2, insertTime) as col2, now() as insertTime FROM generateRandom('col1 String, col2 Array(Float64)') LIMIT 1000000 FORMAT CSV" | curl -s 'http://localhost:8123/?query=INSERT%20INTO%20non_existing_table%20SELECT%20col1%2C%20initializeAggregation(%27argMaxState%27%2C%20col2%2C%20insertTime)%20as%20col2%2C%20now()%20as%20insertTime%20FROM%20input(%27col1%20String%2C%20col2%20Array(Float64)%27)%20FORMAT%20CSV' --data-binary @- | grep -q "Table default.non_existing_table doesn't exist" && echo 'Ok.' || echo 'FAIL' ||: