From f06f0e3947a94782d8dedb0f7305941149731fb4 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Thu, 30 May 2019 17:29:30 -0400 Subject: [PATCH] * Adding support for CREATE TEMPORARY LIVE VIEW * Fixing issue with setting _version virtual column --- .../Interpreters/InterpreterCreateQuery.cpp | 2 +- .../Interpreters/InterpreterWatchQuery.cpp | 6 ++++++ dbms/src/Storages/StorageLiveView.cpp | 11 +++++----- dbms/src/Storages/StorageLiveView.h | 20 ++++++++++++------- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 92b1eccb952..aed7a60b805 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -436,7 +436,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const return; } - if (create.temporary) + if (create.temporary && !create.is_live_view) { auto engine_ast = std::make_shared(); engine_ast->name = "Memory"; diff --git a/dbms/src/Interpreters/InterpreterWatchQuery.cpp b/dbms/src/Interpreters/InterpreterWatchQuery.cpp index 4672a42a304..3ba8e2eadaa 100644 --- a/dbms/src/Interpreters/InterpreterWatchQuery.cpp +++ b/dbms/src/Interpreters/InterpreterWatchQuery.cpp @@ -23,6 +23,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_STORAGE; + extern const int UNKNOWN_TABLE; extern const int TOO_MANY_COLUMNS; } @@ -49,6 +50,11 @@ BlockIO InterpreterWatchQuery::execute() /// Get storage storage = context.tryGetTable(database, table); + if (!storage) + throw Exception("Table " + backQuoteIfNeed(database) + "." + + backQuoteIfNeed(table) + " doesn't exist.", + ErrorCodes::UNKNOWN_TABLE); + /// List of columns to read to execute the query. Names required_columns = storage->getColumns().getNamesOfPhysical(); diff --git a/dbms/src/Storages/StorageLiveView.cpp b/dbms/src/Storages/StorageLiveView.cpp index 7f010c2b3de..799a2193932 100644 --- a/dbms/src/Storages/StorageLiveView.cpp +++ b/dbms/src/Storages/StorageLiveView.cpp @@ -192,12 +192,13 @@ bool StorageLiveView::getNewBlocks() BlockInputStreamPtr data = std::make_shared(select.execute().in); while (Block block = data->read()) { + /// calculate hash before virtual column is added + block.updateHash(hash); /// add result version meta column block.insert({DataTypeUInt64().createColumnConst( block.rows(), getBlocksVersion() + 1)->convertToFullColumnIfConst(), std::make_shared(), "_version"}); - block.updateHash(hash); new_blocks->push_back(block); } @@ -411,8 +412,8 @@ BlockInputStreams StorageLiveView::watch( Poco::FastMutex::ScopedLock lock(mutex); if (!(*blocks_ptr)) { - if (getNewBlocks()) - condition.broadcast(); + if (getNewBlocks()) + condition.broadcast(); } } @@ -435,8 +436,8 @@ BlockInputStreams StorageLiveView::watch( Poco::FastMutex::ScopedLock lock(mutex); if (!(*blocks_ptr)) { - if (getNewBlocks()) - condition.broadcast(); + if (getNewBlocks()) + condition.broadcast(); } } diff --git a/dbms/src/Storages/StorageLiveView.h b/dbms/src/Storages/StorageLiveView.h index 0c6801f9f61..d16a80ee888 100644 --- a/dbms/src/Storages/StorageLiveView.h +++ b/dbms/src/Storages/StorageLiveView.h @@ -88,14 +88,16 @@ public: Poco::FastMutex noUsersThreadMutex; bool noUsersThreadWakeUp{false}; Poco::Condition noUsersThreadCondition; - + /// Get blocks hash + /// must be called with mutex locked String getBlocksHashKey() { if (*blocks_metadata_ptr) return (*blocks_metadata_ptr)->hash; return ""; } - + /// Get blocks version + /// must be called with mutex locked UInt64 getBlocksVersion() { if (*blocks_metadata_ptr) @@ -295,7 +297,6 @@ public: new_blocks = std::make_shared(); new_blocks_metadata = std::make_shared(); new_hash = std::make_shared(); - new_blocks_metadata->version = storage.getBlocksVersion() + 1; } void writeSuffix() override @@ -305,6 +306,15 @@ public: new_hash->get128(key.low, key.high); new_blocks_metadata->hash = key.toHexString(); + new_blocks_metadata->version = storage.getBlocksVersion() + 1; + + for (auto & block : *new_blocks) + { + block.insert({DataTypeUInt64().createColumnConst( + block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(), + std::make_shared(), + "_version"}); + } (*storage.blocks_ptr) = new_blocks; (*storage.blocks_metadata_ptr) = new_blocks_metadata; @@ -319,10 +329,6 @@ public: void write(const Block & block) override { new_blocks->push_back(block); - new_blocks->back().insert({DataTypeUInt64().createColumnConst( - block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(), - std::make_shared(), - "_version"}); block.updateHash(*new_hash); }