Merge pull request #29444 from CurtizJ/fix-deadlock-async-inserts

Fix deadlock in concurrent async inserts and truncates
This commit is contained in:
alexey-milovidov 2021-09-29 00:56:29 +03:00 committed by GitHub
commit f6db7552b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 7 deletions

View File

@ -34,6 +34,10 @@ using RWLock = std::shared_ptr<RWLockImpl>;
/// - SELECT thread 1 locks in the Read mode
/// - ALTER tries to lock in the Write mode (waits for SELECT thread 1)
/// - SELECT thread 2 tries to lock in the Read mode (waits for ALTER)
///
/// NOTE: it is dangerous to acquire lock with NO_QUERY, because FastPath doesn't
/// exist for this case and deadlock, described in previous note,
/// may accur in case of recursive locking.
class RWLockImpl : public std::enable_shared_from_this<RWLockImpl>
{
public:

View File

@ -150,7 +150,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
}
}
void AsynchronousInsertQueue::scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
{
/// Wrap 'unique_ptr' with 'shared_ptr' to make this
/// lambda copyable and allow to save it to the thread pool.
@ -221,7 +221,7 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator
data->entries.size(), data->size, queryToString(it->first.query));
if (data->size > max_data_size)
scheduleProcessDataJob(it->first, std::move(data), getContext());
scheduleDataProcessingJob(it->first, std::move(data), getContext());
}
void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout)
@ -266,7 +266,7 @@ void AsynchronousInsertQueue::busyCheck()
auto lag = std::chrono::steady_clock::now() - elem->data->first_update;
if (lag >= busy_timeout)
scheduleProcessDataJob(key, std::move(elem->data), getContext());
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
else
timeout = std::min(timeout, std::chrono::ceil<std::chrono::milliseconds>(busy_timeout - lag));
}
@ -288,7 +288,7 @@ void AsynchronousInsertQueue::staleCheck()
auto lag = std::chrono::steady_clock::now() - elem->data->last_update;
if (lag >= stale_timeout)
scheduleProcessDataJob(key, std::move(elem->data), getContext());
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
}
}
}
@ -369,6 +369,10 @@ try
insert_context->makeQueryContext();
insert_context->setSettings(key.settings);
/// Set initial_query_id, because it's used in InterpreterInsertQuery for table lock.
insert_context->getClientInfo().query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
insert_context->setCurrentQueryId("");
InterpreterInsertQuery interpreter(key.query, insert_context, key.settings.insert_allow_materialized_columns, false, false, true);
auto pipeline = interpreter.execute().pipeline;
assert(pipeline.pushing());
@ -431,6 +435,10 @@ catch (const Exception & e)
{
finishWithException(key.query, data->entries, e);
}
catch (const Poco::Exception & e)
{
finishWithException(key.query, data->entries, e);
}
catch (const std::exception & e)
{
finishWithException(key.query, data->entries, e);

View File

@ -131,7 +131,7 @@ private:
/// Should be called with shared or exclusively locked 'rwlock'.
void pushImpl(InsertData::EntryPtr entry, QueueIterator it);
void scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
template <typename E>

View File

@ -55,7 +55,7 @@ export -f insert1
export -f insert2
export -f select1
export -f select2
# export -f truncate1
export -f truncate1
for _ in {1..5}; do
timeout $TIMEOUT bash -c insert1 &
@ -64,7 +64,7 @@ done
timeout $TIMEOUT bash -c select1 &
timeout $TIMEOUT bash -c select2 &
# timeout $TIMEOUT bash -c truncate1 &
timeout $TIMEOUT bash -c truncate1 &
wait
echo "OK"