Fixed possible deadlock in distributed queries

This commit is contained in:
Alexey Milovidov 2019-08-31 15:18:14 +03:00
parent 783df7a5c5
commit aac0b27daa
7 changed files with 15 additions and 5 deletions

View File

@ -26,7 +26,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream, * Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality. * but it's clear that here is not the best place for this functionality.
*/ */
addTableLock(storage->lockStructureForShare(true, context.getCurrentQueryId())); addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId()));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children /// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks

View File

@ -65,7 +65,7 @@ FunctionBasePtr FunctionBuilderJoinGet::buildImpl(const ColumnsWithTypeAndName &
auto join = storage_join->getJoin(); auto join = storage_join->getJoin();
DataTypes data_types(arguments.size()); DataTypes data_types(arguments.size());
auto table_lock = storage_join->lockStructureForShare(false, context.getCurrentQueryId()); auto table_lock = storage_join->lockStructureForShare(false, context.getInitialQueryId());
for (size_t i = 0; i < arguments.size(); ++i) for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type; data_types[i] = arguments[i].type;

View File

@ -1162,6 +1162,12 @@ String Context::getCurrentQueryId() const
} }
String Context::getInitialQueryId() const
{
return client_info.initial_query_id;
}
void Context::setCurrentDatabase(const String & name) void Context::setCurrentDatabase(const String & name)
{ {
auto lock = getLock(); auto lock = getLock();

View File

@ -264,6 +264,10 @@ public:
String getCurrentDatabase() const; String getCurrentDatabase() const;
String getCurrentQueryId() const; String getCurrentQueryId() const;
/// Id of initiating query for distributed queries; or current query id if it's not a distributed query.
String getInitialQueryId() const;
void setCurrentDatabase(const String & name); void setCurrentDatabase(const String & name);
void setCurrentQueryId(const String & query_id); void setCurrentQueryId(const String & query_id);

View File

@ -92,7 +92,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = context.getTable(database_name, table_name); table = context.getTable(database_name, table_name);
} }
auto table_lock = table->lockStructureForShare(false, context.getCurrentQueryId()); auto table_lock = table->lockStructureForShare(false, context.getInitialQueryId());
columns = table->getColumns(); columns = table->getColumns();
} }

View File

@ -100,7 +100,7 @@ BlockIO InterpreterInsertQuery::execute()
checkAccess(query); checkAccess(query);
StoragePtr table = getTable(query); StoragePtr table = getTable(query);
auto table_lock = table->lockStructureForShare(true, context.getCurrentQueryId()); auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
/// We create a pipeline of several streams, into which we will write data. /// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out; BlockOutputStreamPtr out;

View File

@ -294,7 +294,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
} }
if (storage) if (storage)
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId()); table_lock = storage->lockStructureForShare(false, context.getInitialQueryId());
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze( syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList()); query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());