Merge pull request #36212 from Avogar/revert-fix

Revert reverting "Fix crash in ParallelReadBuffer"
This commit is contained in:
Kruglov Pavel 2022-04-14 11:26:31 +02:00 committed by GitHub
commit 4f9ee879d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 5 deletions

View File

@ -33,6 +33,7 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
++active_working_reader;
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); });
return true;
@ -203,11 +204,6 @@ bool ParallelReadBuffer::nextImpl()
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
{
std::lock_guard lock{mutex};
++active_working_reader;
}
SCOPE_EXIT({
std::lock_guard lock{mutex};
--active_working_reader;

View File

@ -1407,3 +1407,24 @@ def test_insert_select_schema_inference(started_cluster):
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_insert_select.native')"
)
assert int(result) == 1
def test_parallel_reading_with_memory_limit(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') select * from numbers(1000000)"
)
result = instance.query_and_get_error(
f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') settings max_memory_usage=1000"
)
assert "Memory limit (for query) exceeded" in result
time.sleep(5)
# Check that server didn't crash
result = instance.query("select 1")
assert int(result) == 1