Merge pull request #34866 from CurtizJ/async-insert-table-function

Fix async inserts to table functions
This commit is contained in:
Anton Popov 2022-03-01 19:45:22 +03:00 committed by GitHub
commit 82d24f06eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 2 deletions

View File

@ -184,7 +184,10 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
if (!FormatFactory::instance().isInputFormat(insert_query.format)) if (!FormatFactory::instance().isInputFormat(insert_query.format))
throw Exception(ErrorCodes::UNKNOWN_FORMAT, "Unknown input format {}", insert_query.format); throw Exception(ErrorCodes::UNKNOWN_FORMAT, "Unknown input format {}", insert_query.format);
query_context->checkAccess(AccessType::INSERT, insert_query.table_id, sample_block.getNames()); /// For table functions we check access while executing
/// InterpreterInsertQuery::getTable() -> ITableFunction::execute().
if (insert_query.table_id)
query_context->checkAccess(AccessType::INSERT, insert_query.table_id, sample_block.getNames());
String bytes; String bytes;
{ {
@ -411,7 +414,7 @@ try
}; };
std::shared_ptr<ISimpleTransform> adding_defaults_transform; std::shared_ptr<ISimpleTransform> adding_defaults_transform;
if (insert_context->getSettingsRef().input_format_defaults_for_omitted_fields) if (insert_context->getSettingsRef().input_format_defaults_for_omitted_fields && insert_query.table_id)
{ {
StoragePtr storage = DatabaseCatalog::instance().getTable(insert_query.table_id, insert_context); StoragePtr storage = DatabaseCatalog::instance().getTable(insert_query.table_id, insert_context);
auto metadata_snapshot = storage->getInMemoryMetadataPtr(); auto metadata_snapshot = storage->getInMemoryMetadataPtr();

View File

@ -295,6 +295,9 @@ BlockIO InterpreterInsertQuery::execute()
auto metadata_snapshot = table->getInMemoryMetadataPtr(); auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot); auto query_sample_block = getSampleBlock(query, table, metadata_snapshot);
/// For table functions we check access while executing
/// getTable() -> ITableFunction::execute().
if (!query.table_function) if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames()); getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());

View File

@ -0,0 +1,2 @@
1 aaa
2 bbb

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS t_async_insert_table_function;
CREATE TABLE t_async_insert_table_function (id UInt32, s String) ENGINE = Memory;
SET async_insert = 1;
INSERT INTO function remote('127.0.0.1', currentDatabase(), t_async_insert_table_function) values (1, 'aaa') (2, 'bbb');
SELECT * FROM t_async_insert_table_function ORDER BY id;
DROP TABLE t_async_insert_table_function;