fix deadlock in concurrent async inserts and truncates

This commit is contained in:
Anton Popov 2021-09-27 19:39:32 +03:00
parent 70dc43a72d
commit 9a58e4a8ba
4 changed files with 19 additions and 7 deletions

View File

@ -34,6 +34,10 @@ using RWLock = std::shared_ptr<RWLockImpl>;
/// - SELECT thread 1 locks in the Read mode
/// - ALTER tries to lock in the Write mode (waits for SELECT thread 1)
/// - SELECT thread 2 tries to lock in the Read mode (waits for ALTER)
///
/// NOTE: it is dangerous to acquire lock with NO_QUERY, because FastPath doesn't
/// exist for this case and deadlock, described in previous note,
/// may accur in case of recursive locking.
class RWLockImpl : public std::enable_shared_from_this<RWLockImpl>
{
public:

View File

@ -148,7 +148,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
}
}
void AsynchronousInsertQueue::scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
{
/// Wrap 'unique_ptr' with 'shared_ptr' to make this
/// lambda copyable and allow to save it to the thread pool.
@ -219,7 +219,7 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator
data->entries.size(), data->size, queryToString(it->first.query));
if (data->size > max_data_size)
scheduleProcessDataJob(it->first, std::move(data), getContext());
scheduleDataProcessingJob(it->first, std::move(data), getContext());
}
void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout)
@ -264,7 +264,7 @@ void AsynchronousInsertQueue::busyCheck()
auto lag = std::chrono::steady_clock::now() - elem->data->first_update;
if (lag >= busy_timeout)
scheduleProcessDataJob(key, std::move(elem->data), getContext());
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
else
timeout = std::min(timeout, std::chrono::ceil<std::chrono::milliseconds>(busy_timeout - lag));
}
@ -286,7 +286,7 @@ void AsynchronousInsertQueue::staleCheck()
auto lag = std::chrono::steady_clock::now() - elem->data->last_update;
if (lag >= stale_timeout)
scheduleProcessDataJob(key, std::move(elem->data), getContext());
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
}
}
}
@ -367,6 +367,10 @@ try
insert_context->makeQueryContext();
insert_context->setSettings(key.settings);
/// Set initial_query_id, because it's used in InterpreterInsertQuery for table lock.
insert_context->getClientInfo().query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
insert_context->setCurrentQueryId("");
InterpreterInsertQuery interpreter(key.query, insert_context, key.settings.insert_allow_materialized_columns);
auto sinks = interpreter.getSinks();
assert(sinks.size() == 1);
@ -434,6 +438,10 @@ catch (const Exception & e)
{
finishWithException(key.query, data->entries, e);
}
catch (const Poco::Exception & e)
{
finishWithException(key.query, data->entries, e);
}
catch (const std::exception & e)
{
finishWithException(key.query, data->entries, e);

View File

@ -134,7 +134,7 @@ private:
/// Should be called with shared or exclusively locked 'rwlock'.
void pushImpl(InsertData::EntryPtr entry, QueueIterator it);
void scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
template <typename E>

View File

@ -55,7 +55,7 @@ export -f insert1
export -f insert2
export -f select1
export -f select2
# export -f truncate1
export -f truncate1
for _ in {1..5}; do
timeout $TIMEOUT bash -c insert1 &
@ -64,7 +64,7 @@ done
timeout $TIMEOUT bash -c select1 &
timeout $TIMEOUT bash -c select2 &
# timeout $TIMEOUT bash -c truncate1 &
timeout $TIMEOUT bash -c truncate1 &
wait
echo "OK"