From 4351a16103110096ef961a8e8ac271d9d99a88da Mon Sep 17 00:00:00 2001 From: Vxider Date: Thu, 13 Feb 2020 02:08:52 +0800 Subject: [PATCH] multi 'WATCH' and 'TO' support --- dbms/src/Storages/WindowView/BlocksListInputStream.h | 8 ++++---- dbms/src/Storages/WindowView/StorageWindowView.cpp | 10 ++-------- dbms/src/Storages/WindowView/StorageWindowView.h | 6 +++--- .../Storages/WindowView/WindowViewBlockInputStream.h | 9 +++++---- 4 files changed, 14 insertions(+), 19 deletions(-) diff --git a/dbms/src/Storages/WindowView/BlocksListInputStream.h b/dbms/src/Storages/WindowView/BlocksListInputStream.h index f2fbb37f164..2280f06ac3a 100644 --- a/dbms/src/Storages/WindowView/BlocksListInputStream.h +++ b/dbms/src/Storages/WindowView/BlocksListInputStream.h @@ -16,8 +16,8 @@ class BlocksListInputStream : public IBlockInputStream { public: /// Acquires shared ownership of the blocks vector - BlocksListInputStream(BlocksListPtrs blocks_ptr_, Block header_, std::mutex &mutex_, UInt32 window_upper_bound_) - : blocks(blocks_ptr_), mutex(mutex_), window_upper_bound(window_upper_bound_), header(std::move(header_)) + BlocksListInputStream(BlocksListPtrs blocks_ptr_, Block header_, UInt32 window_upper_bound_) + : blocks(blocks_ptr_), window_upper_bound(window_upper_bound_), header(std::move(header_)) { it_blocks = blocks->begin(); end_blocks = blocks->end(); @@ -46,7 +46,7 @@ protected: IColumn::Filter filter(column_status->size(), 0); auto & data = static_cast(*column_status).getData(); { - std::unique_lock lock(mutex); + // std::unique_lock lock(mutex); for (size_t i = 0; i < column_status->size(); ++i) { if (data[i] == window_upper_bound) @@ -121,7 +121,7 @@ private: std::list::iterator end_blocks; BlocksList::iterator it; BlocksList::iterator end; - std::mutex & mutex; + // std::mutex & mutex; UInt32 window_upper_bound; Block header; }; diff --git a/dbms/src/Storages/WindowView/StorageWindowView.cpp b/dbms/src/Storages/WindowView/StorageWindowView.cpp index 729d26c5c9c..895577d32d5 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.cpp +++ b/dbms/src/Storages/WindowView/StorageWindowView.cpp @@ -420,12 +420,6 @@ BlockInputStreams StorageWindowView::watch( size_t /*max_block_size*/, const unsigned /*num_streams*/) { - if (!target_table_id.empty()) - throw Exception("WATCH query is disabled for " + getName() + " when constructed with 'TO' clause.", ErrorCodes::INCORRECT_QUERY); - - if (active_ptr.use_count() > 1) - throw Exception("WATCH query is already attached, WINDOW VIEW only supports attaching one watch query.", ErrorCodes::INCORRECT_QUERY); - ASTWatchQuery & query = typeid_cast(*query_info.query); bool has_limit = false; @@ -519,7 +513,7 @@ StorageWindowView::StorageWindowView( } else { - if(query.storage->engine->name != "MergeTree") + if (query.storage->engine->name != "MergeTree") throw Exception( "The ENGINE of WindowView must be MergeTree family of table engines including the engines with replication support", ErrorCodes::INCORRECT_QUERY); @@ -673,7 +667,7 @@ BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr() BlockInputStreams from; auto sample_block_ = mergeable_blocks->front()->front().cloneEmpty(); - BlockInputStreamPtr stream = std::make_shared(mergeable_blocks, sample_block_, mutex,w_upper_bound); + BlockInputStreamPtr stream = std::make_shared(mergeable_blocks, sample_block_, w_upper_bound); from.push_back(std::move(stream)); auto proxy_storage = std::make_shared( StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState); diff --git a/dbms/src/Storages/WindowView/StorageWindowView.h b/dbms/src/Storages/WindowView/StorageWindowView.h index 265386178bd..df42fee3434 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.h +++ b/dbms/src/Storages/WindowView/StorageWindowView.h @@ -64,11 +64,11 @@ public: Block getHeader() const; - StoragePtr & getParentStorage() + StoragePtr& getParentStorage() { - if (parent_storage == nullptr) + if (parent_storage == nullptr) parent_storage = global_context.getTable(select_table_id); - return parent_storage; + return parent_storage; } StoragePtr& getInnerStorage() diff --git a/dbms/src/Storages/WindowView/WindowViewBlockInputStream.h b/dbms/src/Storages/WindowView/WindowViewBlockInputStream.h index 4f4decc62db..ef7cb3a2777 100644 --- a/dbms/src/Storages/WindowView/WindowViewBlockInputStream.h +++ b/dbms/src/Storages/WindowView/WindowViewBlockInputStream.h @@ -58,7 +58,7 @@ protected: /// If blocks were never assigned get blocks if (!in_stream) { - std::unique_lock lock(storage->mutex); + // std::unique_lock lock(storage->mutex); in_stream = storage->getNewBlocksInputStreamPtr(); } if (isCancelled() || storage->is_dropped) @@ -69,7 +69,7 @@ protected: res = in_stream->read(); if (!res) { - if (!active) + if (!(*active)) return Block(); if (!end_of_blocks) @@ -79,7 +79,7 @@ protected: return getHeader(); } - std::unique_lock lock(storage->flushTableMutex); + std::unique_lock lock(mutex); UInt64 timestamp_usec = static_cast(Poco::Timestamp().epochMicroseconds()); UInt64 w_end = static_cast(storage->getWindowUpperBound(static_cast(timestamp_usec / 1000000))) * 1000000; storage->condition.wait_for(lock, std::chrono::microseconds(w_end - timestamp_usec)); @@ -89,7 +89,7 @@ protected: return Block(); } { - std::unique_lock lock_(storage->mutex); + // std::unique_lock lock_(storage->mutex); in_stream = storage->getNewBlocksInputStreamPtr(); } @@ -113,6 +113,7 @@ private: std::shared_ptr active; const bool has_limit; const UInt64 limit; + std::mutex mutex; Int64 num_updates = -1; bool end_of_blocks = false; BlockInputStreamPtr in_stream;