dbms: fixed error [#METR-17286].

This commit is contained in:
Alexey Milovidov 2015-09-18 03:46:36 +03:00
parent 7a7a2ac6e0
commit fa9553307e
2 changed files with 25 additions and 4 deletions

View File

@ -22,6 +22,11 @@ public:
: context(context_), query_ptr(query_ptr_)
{
storage = context.getTable(database, table);
/** TODO Это очень важная строчка. При любой вставке в таблицу один из stream-ов должен владеть lock-ом.
* Хотя сейчас любая вставка в таблицу делается через PushingToViewsBlockOutputStream,
* но ясно, что здесь - не лучшее место для этой функциональности.
*/
addTableLock(storage->lockStructure(true));
Dependencies dependencies = context.getDependencies(database, table);

View File

@ -6,8 +6,11 @@
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DB/DataStreams/ProhibitColumnsBlockOutputStream.h>
#include <DB/DataStreams/MaterializingBlockOutputStream.h>
#include <DB/DataStreams/AddingDefaultBlockOutputStream.h>
#include <DB/DataStreams/PushingToViewsBlockOutputStream.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTNameTypePair.h>
@ -251,11 +254,24 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
/// Если запрос CREATE SELECT, то вставим в таблицу данные
if (create.select && storage_name != "View" && (storage_name != "MaterializedView" || create.is_populate))
{
auto table_lock = res->lockStructure(true);
/// Также см. InterpreterInsertQuery.
BlockOutputStreamPtr out{
new ProhibitColumnsBlockOutputStream{
new AddingDefaultBlockOutputStream{
new MaterializingBlockOutputStream{
new PushingToViewsBlockOutputStream{create.database, create.table, context, query_ptr}
},
columns, column_defaults, context, context.getSettingsRef().strict_insert_defaults
},
materialized_columns
}
};
BlockIO io;
io.in_sample = select_sample;
io.in = new NullAndDoCopyBlockInputStream(
new MaterializingBlockInputStream(interpreter_select->execute().in),
res->write(query_ptr, context.getSettingsRef()));
io.in = new NullAndDoCopyBlockInputStream(interpreter_select->execute().in, out);
return io;
}