From ca14a29dbeb6a01303986b6fb5a127db66cc3cf4 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 13 Apr 2022 20:39:12 +0000 Subject: [PATCH] Revert reverting "Fix crash in ParallelReadBuffer" --- src/IO/ParallelReadBuffer.cpp | 6 +----- tests/integration/test_storage_s3/test.py | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/IO/ParallelReadBuffer.cpp b/src/IO/ParallelReadBuffer.cpp index f036d6a08c8..79a16347094 100644 --- a/src/IO/ParallelReadBuffer.cpp +++ b/src/IO/ParallelReadBuffer.cpp @@ -33,6 +33,7 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock & /*buffer auto worker = read_workers.emplace_back(std::make_shared(std::move(reader))); + ++active_working_reader; schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }); return true; @@ -203,11 +204,6 @@ bool ParallelReadBuffer::nextImpl() void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker) { - { - std::lock_guard lock{mutex}; - ++active_working_reader; - } - SCOPE_EXIT({ std::lock_guard lock{mutex}; --active_working_reader; diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index e32ddd2782b..87f97e7454a 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -1407,3 +1407,24 @@ def test_insert_select_schema_inference(started_cluster): f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_insert_select.native')" ) assert int(result) == 1 + + +def test_parallel_reading_with_memory_limit(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') select * from numbers(1000000)" + ) + + result = instance.query_and_get_error( + f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') settings max_memory_usage=1000" + ) + + assert "Memory limit (for query) exceeded" in result + + time.sleep(5) + + # Check that server didn't crash + result = instance.query("select 1") + assert int(result) == 1