Merge pull request #59427 from seandhaynes/master

Use scheduleOrThrow in MergeTree data selection and initialization to avoid deadlocks
This commit is contained in:
Alexey Milovidov 2024-06-06 02:06:22 +02:00 committed by GitHub
commit 9d8bc4d54d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 92 additions and 6 deletions

View File

@ -670,7 +670,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
for (auto it = metadata_files.begin(); it < metadata_files.end(); std::advance(it, batch_size)) for (auto it = metadata_files.begin(); it < metadata_files.end(); std::advance(it, batch_size))
{ {
std::span batch{it, std::min(std::next(it, batch_size), metadata_files.end())}; std::span batch{it, std::min(std::next(it, batch_size), metadata_files.end())};
pool.scheduleOrThrowOnError( pool.scheduleOrThrow(
[batch, &process_metadata_file, &process_tmp_drop_metadata_file]() mutable [batch, &process_metadata_file, &process_tmp_drop_metadata_file]() mutable
{ {
setThreadName("DatabaseOnDisk"); setThreadName("DatabaseOnDisk");
@ -679,7 +679,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
process_metadata_file(file.first); process_metadata_file(file.first);
else else
process_tmp_drop_metadata_file(file.first); process_tmp_drop_metadata_file(file.first);
}); }, Priority{}, getContext()->getSettingsRef().lock_acquire_timeout.totalMicroseconds());
} }
pool.wait(); pool.wait();
} }

View File

@ -67,6 +67,8 @@ namespace ErrorCodes
{ {
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int CANNOT_COMPILE_REGEXP;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_PARSE_TEXT; extern const int CANNOT_PARSE_TEXT;
extern const int CANNOT_PARSE_ESCAPE_SEQUENCE; extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
extern const int CANNOT_PARSE_QUOTED_STRING; extern const int CANNOT_PARSE_QUOTED_STRING;
@ -78,8 +80,7 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_IPV6; extern const int CANNOT_PARSE_IPV6;
extern const int CANNOT_PARSE_UUID; extern const int CANNOT_PARSE_UUID;
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED; extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
extern const int CANNOT_OPEN_FILE; extern const int CANNOT_SCHEDULE_TASK;
extern const int CANNOT_COMPILE_REGEXP;
extern const int DUPLICATE_COLUMN; extern const int DUPLICATE_COLUMN;
extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_COLUMN;
extern const int THERE_IS_NO_COLUMN; extern const int THERE_IS_NO_COLUMN;
@ -267,6 +268,10 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
{ {
return HTTPResponse::HTTP_REQUEST_TIMEOUT; return HTTPResponse::HTTP_REQUEST_TIMEOUT;
} }
else if (exception_code == ErrorCodes::CANNOT_SCHEDULE_TASK)
{
return HTTPResponse::HTTP_SERVICE_UNAVAILABLE;
}
return HTTPResponse::HTTP_INTERNAL_SERVER_ERROR; return HTTPResponse::HTTP_INTERNAL_SERVER_ERROR;
} }

View File

@ -760,9 +760,16 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
CurrentMetrics::MergeTreeDataSelectExecutorThreadsScheduled, CurrentMetrics::MergeTreeDataSelectExecutorThreadsScheduled,
num_threads); num_threads);
/// Instances of ThreadPool "borrow" threads from the global thread pool.
/// We intentionally use scheduleOrThrow here to avoid a deadlock.
/// For example, queries can already be running with threads from the
/// global pool, and if we saturate max_thread_pool_size whilst requesting
/// more in this loop, queries will block infinitely.
/// So we wait until lock_acquire_timeout, and then raise an exception.
for (size_t part_index = 0; part_index < parts.size(); ++part_index) for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{ {
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] pool.scheduleOrThrow([&, part_index, thread_group = CurrentThread::getGroup()]
{ {
setThreadName("MergeTreeIndex"); setThreadName("MergeTreeIndex");
@ -774,7 +781,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
CurrentThread::attachToGroupIfDetached(thread_group); CurrentThread::attachToGroupIfDetached(thread_group);
process_part(part_index); process_part(part_index);
}); }, Priority{}, context->getSettingsRef().lock_acquire_timeout.totalMicroseconds());
} }
pool.wait(); pool.wait();

View File

@ -0,0 +1,6 @@
<clickhouse>
<!-- Run with a small thread pool size so it's easier to saturate -->
<max_thread_pool_size>300</max_thread_pool_size>
<thread_pool_queue_size>1</thread_pool_queue_size>
<background_schedule_pool_size>128</background_schedule_pool_size>
</clickhouse>

View File

@ -0,0 +1,68 @@
import concurrent.futures
import pytest
from helpers.cluster import ClickHouseCluster
MAX_THREADS = 60
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
macros={"cluster": "test-cluster", "replica": "node1"},
main_configs=["configs/settings.xml"],
with_zookeeper=True,
)
def prepare_cluster():
node1.query("DROP TABLE IF EXISTS test_threads_busy SYNC")
node1.query(
"""
CREATE TABLE test_threads_busy(d Date, i Int64, s String) ENGINE=MergeTree PARTITION BY toYYYYMMDD(d) ORDER BY d
"""
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def do_slow_select():
# Do a bunch of slow queries that use a large number of threads to saturate max_thread_pool_size
# explicitly set max_threads as otherwise it's relative to the number of CPU cores
query = (
"SELECT d, i, s, sleepEachRow(3) from test_threads_busy SETTINGS max_threads=40"
)
node1.query(query)
def test_query_exception_on_thread_pool_full(started_cluster):
prepare_cluster()
# Generate some sample data so sleepEachRow in do_slow_select works
node1.query(
f"INSERT INTO test_threads_busy VALUES ('2024-01-01', 1, 'thread-test')"
)
futures = []
errors = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
for _ in range(MAX_THREADS):
futures.append(executor.submit(do_slow_select))
for f in futures:
try:
f.result()
except Exception as err:
errors.append(str(err))
assert len(errors) > 0, "Should be 'Cannot schedule a task' exceptions"
assert all(
"Cannot schedule a task" in err for err in errors
), "Query threads are stuck, or returned an unexpected error"