diff --git a/dbms/src/DataStreams/BlocksBlockInputStream.h b/dbms/src/DataStreams/BlocksBlockInputStream.h new file mode 100644 index 00000000000..ad0d37da622 --- /dev/null +++ b/dbms/src/DataStreams/BlocksBlockInputStream.h @@ -0,0 +1,51 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#pragma once + +#include + + +namespace DB +{ + +/** A stream of blocks from a shared vector of blocks + */ +class BlocksBlockInputStream : public IBlockInputStream +{ +public: + /// Acquires shared ownership of the blocks vector + BlocksBlockInputStream(std::shared_ptr blocks_ptr_, Block header) + : blocks_ptr(blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()), header(header) {} + + String getName() const override { return "Blocks"; } + + Block getHeader() const override { return header; } + +protected: + Block readImpl() override + { + if (it == end) + return Block(); + + Block res = *it; + ++it; + return res; + } + +private: + std::shared_ptr blocks_ptr; + Blocks::iterator it; + const Blocks::iterator end; + Block header; +}; + +} diff --git a/dbms/src/DataStreams/LiveViewBlockInputStream.h b/dbms/src/DataStreams/LiveViewBlockInputStream.h new file mode 100644 index 00000000000..3bb0a52efae --- /dev/null +++ b/dbms/src/DataStreams/LiveViewBlockInputStream.h @@ -0,0 +1,209 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#pragma once + +#include + +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** + */ + +class LiveViewBlockInputStream : public IBlockInputStream +{ + +using NonBlockingResult = std::pair; + +public: + ~LiveViewBlockInputStream() override + { + /// Start storage no users thread + /// if we are the last active user + if (!storage.is_dropped && blocks_ptr.use_count() < 3) + storage.startNoUsersThread(); + } + /// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update + /// and LIMIT 0 just returns data without waiting for any updates + LiveViewBlockInputStream(StorageLiveView & storage_, std::shared_ptr blocks_ptr_, std::shared_ptr active_ptr_, Poco::Condition & condition_, Poco::FastMutex & mutex_, + int64_t length_, const UInt64 & heartbeat_delay_) + : storage(storage_), blocks_ptr(blocks_ptr_), active_ptr(active_ptr_), condition(condition_), mutex(mutex_), length(length_ + 1), heartbeat_delay(heartbeat_delay_), blocks_hash("") + { + /// grab active pointer + active = active_ptr.lock(); + } + + String getName() const override { return "LiveViewBlockInputStream"; } + + void cancel(bool kill) override + { + if (isCancelled() || storage.is_dropped) + return; + IBlockInputStream::cancel(kill); + Poco::FastMutex::ScopedLock lock(mutex); + condition.broadcast(); + } + + Block getHeader() const override { return storage.getHeader(); } + + void refresh() + { + if (active && blocks && it == end) + it = blocks->begin(); + } + + void suspend() + { + active.reset(); + } + + void resume() + { + active = active_ptr.lock(); + { + if (!blocks || blocks.get() != (*blocks_ptr).get()) + blocks = (*blocks_ptr); + } + it = blocks->begin(); + begin = blocks->begin(); + end = blocks->end(); + } + + NonBlockingResult tryRead() + { + return tryRead_(false); + } + +protected: + Block readImpl() override + { + /// try reading + return tryRead_(true).first; + } + + /** tryRead method attempts to read a block in either blocking + * or non-blocking mode. If blocking is set to false + * then method return empty block with flag set to false + * to indicate that method would block to get the next block. + */ + NonBlockingResult tryRead_(bool blocking) + { + Block res; + + if (length == 0) + { + return { Block(), true }; + } + /// If blocks were never assigned get blocks + if (!blocks) + { + Poco::FastMutex::ScopedLock lock(mutex); + if (!active) + return { Block(), false }; + blocks = (*blocks_ptr); + it = blocks->begin(); + begin = blocks->begin(); + end = blocks->end(); + } + + if (isCancelled() || storage.is_dropped) + { + return { Block(), true }; + } + + if (it == end) + { + { + Poco::FastMutex::ScopedLock lock(mutex); + if (!active) + return { Block(), false }; + /// If we are done iterating over our blocks + /// and there are new blocks availble then get them + if (blocks.get() != (*blocks_ptr).get()) + { + blocks = (*blocks_ptr); + it = blocks->begin(); + begin = blocks->begin(); + end = blocks->end(); + } + /// No new blocks available wait for new ones + else + { + if (!blocking) + { + return { Block(), false }; + } + while (true) + { + bool signaled = condition.tryWait(mutex, std::max((UInt64)0, heartbeat_delay - ((UInt64)timestamp.epochMicroseconds() - last_event_timestamp)) / 1000); + + if (isCancelled() || storage.is_dropped) + { + return { Block(), true }; + } + if (signaled) + { + break; + } + else + { + //hashmap["blocks"] = blocks_hash; + last_event_timestamp = (UInt64)timestamp.epochMicroseconds(); + //heartbeat(Heartbeat(last_event_timestamp, std::move(hashmap))); + } + } + } + } + return tryRead_(blocking); + } + + res = *it; + + ++it; + + if (it == end) + { + if (length > 0) + --length; + } + + last_event_timestamp = (UInt64)timestamp.epochMicroseconds(); + return { res, true }; + } + +private: + StorageLiveView & storage; + std::shared_ptr blocks_ptr; + std::weak_ptr active_ptr; + std::shared_ptr active; + BlocksPtr blocks; + Blocks::iterator it; + Blocks::iterator end; + Blocks::iterator begin; + Poco::Condition & condition; + Poco::FastMutex & mutex; + /// Length specifies number of updates to send, default -1 (no limit) + int64_t length; + UInt64 heartbeat_delay; + String blocks_hash; + UInt64 last_event_timestamp{0}; + Poco::Timestamp timestamp; +}; + +} diff --git a/dbms/src/Interpreters/InterpreterWatchQuery.cpp b/dbms/src/Interpreters/InterpreterWatchQuery.cpp new file mode 100644 index 00000000000..4672a42a304 --- /dev/null +++ b/dbms/src/Interpreters/InterpreterWatchQuery.cpp @@ -0,0 +1,102 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_STORAGE; + extern const int TOO_MANY_COLUMNS; +} + +BlockInputStreamPtr InterpreterWatchQuery::executeImpl() +{ + return std::make_shared(Block()); +} + +BlockIO InterpreterWatchQuery::execute() +{ + BlockIO res; + const ASTWatchQuery & query = typeid_cast(*query_ptr); + String database; + String table; + /// Get database + if (!query.database.empty()) + database = query.database; + else + database = context.getCurrentDatabase(); + + /// Get table + table = query.table; + + /// Get storage + storage = context.tryGetTable(database, table); + + /// List of columns to read to execute the query. + Names required_columns = storage->getColumns().getNamesOfPhysical(); + + /// Get context settings for this query + const Settings & settings = context.getSettingsRef(); + + /// Limitation on the number of columns to read. + if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) + throw Exception("Limit for number of columns to read exceeded. " + "Requested: " + std::to_string(required_columns.size()) + + ", maximum: " + settings.max_columns_to_read.toString(), + ErrorCodes::TOO_MANY_COLUMNS); + + size_t max_block_size = settings.max_block_size; + size_t max_streams = 1; + + /// Define query info + SelectQueryInfo query_info; + query_info.query = query_ptr; + + /// From stage + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + QueryProcessingStage::Enum to_stage = QueryProcessingStage::Complete; + + /// Watch storage + streams = storage->watch(required_columns, query_info, context, from_stage, max_block_size, max_streams); + + /// Constraints on the result, the quota on the result, and also callback for progress. + if (IBlockInputStream * stream = dynamic_cast(streams[0].get())) + { + /// Constraints apply only to the final result. + if (to_stage == QueryProcessingStage::Complete) + { + IBlockInputStream::LocalLimits limits; + limits.mode = IBlockInputStream::LIMITS_CURRENT; + limits.size_limits.max_rows = settings.max_result_rows; + limits.size_limits.max_bytes = settings.max_result_bytes; + limits.size_limits.overflow_mode = settings.result_overflow_mode; + + stream->setLimits(limits); + stream->setQuota(context.getQuota()); + } + } + + res.in = streams[0]; + + return res; +} + + +} diff --git a/dbms/src/Interpreters/InterpreterWatchQuery.h b/dbms/src/Interpreters/InterpreterWatchQuery.h new file mode 100644 index 00000000000..9315ee1f889 --- /dev/null +++ b/dbms/src/Interpreters/InterpreterWatchQuery.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class IAST; +using ASTPtr = std::shared_ptr; +using StoragePtr = std::shared_ptr; + +class InterpreterWatchQuery : public IInterpreter +{ +public: + InterpreterWatchQuery(const ASTPtr & query_ptr_, Context & context_) + : query_ptr(query_ptr_), context(context_) {} + + BlockIO execute() override; + +private: + ASTPtr query_ptr; + Context & context; + + BlockInputStreamPtr executeImpl(); + /// Table from where to read data, if not subquery. + StoragePtr storage; + /// Streams of read data + BlockInputStreams streams; +}; + + +} diff --git a/dbms/src/Parsers/ASTWatchQuery.h b/dbms/src/Parsers/ASTWatchQuery.h new file mode 100644 index 00000000000..7e75d62a629 --- /dev/null +++ b/dbms/src/Parsers/ASTWatchQuery.h @@ -0,0 +1,59 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#pragma once + +#include + + +namespace DB +{ + +class ASTWatchQuery : public ASTQueryWithTableAndOutput +{ + +public: + ASTPtr limit_length; + bool is_watch_events; + + ASTWatchQuery() = default; + String getID(char) const override { return "WatchQuery_" + database + "_" + table; }; + + ASTPtr clone() const override + { + std::shared_ptr res = std::make_shared(*this); + res->children.clear(); + cloneOutputOptions(*res); + return res; + } + +protected: + void formatQueryImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override + { + std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' '); + + s.ostr << (s.hilite ? hilite_keyword : "") << "WATCH" << " " << (s.hilite ? hilite_none : "") + << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table); + + if (is_watch_events) + { + s.ostr << " " << (s.hilite ? hilite_keyword : "") << "EVENTS" << (s.hilite ? hilite_none : ""); + } + + if (limit_length) + { + s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT " << (s.hilite ? hilite_none : ""); + limit_length->formatImpl(s, state, frame); + } + } +}; + +} diff --git a/dbms/src/Parsers/ParserWatchQuery.cpp b/dbms/src/Parsers/ParserWatchQuery.cpp new file mode 100644 index 00000000000..c29320c5a56 --- /dev/null +++ b/dbms/src/Parsers/ParserWatchQuery.cpp @@ -0,0 +1,77 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +bool ParserWatchQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + ParserKeyword s_watch("WATCH"); + ParserToken s_dot(TokenType::Dot); + ParserIdentifier name_p; + ParserKeyword s_events("EVENTS"); + ParserKeyword s_limit("LIMIT"); + + ASTPtr database; + ASTPtr table; + auto query = std::make_shared(); + + if (!s_watch.ignore(pos, expected)) + { + return false; + } + + if (!name_p.parse(pos, table, expected)) + return false; + + if (s_dot.ignore(pos, expected)) + { + database = table; + if (!name_p.parse(pos, table, expected)) + return false; + } + + /// EVENTS + if (s_events.ignore(pos, expected)) + { + query->is_watch_events = true; + } + + /// LIMIT length + if (s_limit.ignore(pos, expected)) + { + ParserNumber num; + + if (!num.parse(pos, query->limit_length, expected)) + return false; + } + + if (database) + getIdentifierName(database, query->database); + + if (table) + getIdentifierName(table, query->table); + + node = query; + + return true; +} + + +} diff --git a/dbms/src/Parsers/ParserWatchQuery.h b/dbms/src/Parsers/ParserWatchQuery.h new file mode 100644 index 00000000000..330f0a432df --- /dev/null +++ b/dbms/src/Parsers/ParserWatchQuery.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#pragma once + +#include + + +namespace DB +{ + +/** Query like this: + * WATCH [db.]table EVENTS + */ +class ParserWatchQuery : public IParserBase +{ +protected: + const char * getName() const { return "WATCH query"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected); +}; + +} diff --git a/dbms/src/Storages/LiveViewCommands.h b/dbms/src/Storages/LiveViewCommands.h new file mode 100644 index 00000000000..35015a7e5aa --- /dev/null +++ b/dbms/src/Storages/LiveViewCommands.h @@ -0,0 +1,65 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_STORAGE; +} + +struct LiveViewCommand +{ + enum Type + { + REFRESH + }; + + Type type; + + ASTPtr values; + + static LiveViewCommand refresh(const ASTPtr & values) + { + LiveViewCommand res; + res.type = REFRESH; + res.values = values; + return res; + } + + static std::optional parse(ASTAlterCommand * command) + { + if (command->type == ASTAlterCommand::LIVE_VIEW_REFRESH) + return refresh(command->values); + return {}; + } +}; + + +class LiveViewCommands : public std::vector +{ +public: + void validate(const IStorage & table) + { + if (!empty() && !dynamic_cast(&table)) + throw Exception("Wrong storage type. Must be StorageLiveView", DB::ErrorCodes::UNKNOWN_STORAGE); + } +}; + +} diff --git a/dbms/src/Storages/ProxyStorage.h b/dbms/src/Storages/ProxyStorage.h new file mode 100644 index 00000000000..a4196b7d6f2 --- /dev/null +++ b/dbms/src/Storages/ProxyStorage.h @@ -0,0 +1,64 @@ +#pragma once + +#include + +namespace DB +{ + +class ProxyStorage : public IStorage +{ +public: + ProxyStorage(StoragePtr storage, BlockInputStreams streams, QueryProcessingStage::Enum to_stage) + : storage(std::move(storage)), streams(std::move(streams)), to_stage(to_stage) {} + +public: + std::string getName() const override { return "ProxyStorage(" + storage->getName() + ")"; } + std::string getTableName() const override { return storage->getTableName(); } + + bool isRemote() const override { return storage->isRemote(); } + bool supportsSampling() const override { return storage->supportsSampling(); } + bool supportsFinal() const override { return storage->supportsFinal(); } + bool supportsPrewhere() const override { return storage->supportsPrewhere(); } + bool supportsReplication() const override { return storage->supportsReplication(); } + bool supportsDeduplication() const override { return storage->supportsDeduplication(); } + + QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; } + + BlockInputStreams read( + const Names & /*column_names*/, + const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum /*processed_stage*/, + size_t /*max_block_size*/, + unsigned /*num_streams*/) override + { + return streams; + } + + bool supportsIndexForIn() const override { return storage->supportsIndexForIn(); } + bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override { return storage->mayBenefitFromIndexForIn(left_in_operand, query_context); } + ASTPtr getPartitionKeyAST() const override { return storage->getPartitionKeyAST(); } + ASTPtr getSortingKeyAST() const override { return storage->getSortingKeyAST(); } + ASTPtr getPrimaryKeyAST() const override { return storage->getPrimaryKeyAST(); } + ASTPtr getSamplingKeyAST() const override { return storage->getSamplingKeyAST(); } + Names getColumnsRequiredForPartitionKey() const override { return storage->getColumnsRequiredForPartitionKey(); } + Names getColumnsRequiredForSortingKey() const override { return storage->getColumnsRequiredForSortingKey(); } + Names getColumnsRequiredForPrimaryKey() const override { return storage->getColumnsRequiredForPrimaryKey(); } + Names getColumnsRequiredForSampling() const override { return storage->getColumnsRequiredForSampling(); } + Names getColumnsRequiredForFinal() const override { return storage->getColumnsRequiredForFinal(); } + + NameAndTypePair getColumn(const String & column_name) const override { return storage->getColumn(column_name); } + bool hasColumn(const String & column_name) const override { return storage->hasColumn(column_name); } + static StoragePtr createProxyStorage(StoragePtr storage, BlockInputStreams streams, QueryProcessingStage::Enum to_stage) + { + return std::make_shared(std::move(storage), std::move(streams), to_stage); + } +private: + StoragePtr storage; + BlockInputStreams streams; + QueryProcessingStage::Enum to_stage; +}; + + + +} diff --git a/dbms/src/Storages/StorageLiveView.cpp b/dbms/src/Storages/StorageLiveView.cpp new file mode 100644 index 00000000000..5a4e225e52c --- /dev/null +++ b/dbms/src/Storages/StorageLiveView.cpp @@ -0,0 +1,405 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int INCORRECT_QUERY; + extern const int TABLE_WAS_NOT_DROPPED; + extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW; +} + +static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name) +{ + auto db_and_table = getDatabaseAndTable(query, 0); + ASTPtr subquery = extractTableExpression(query, 0); + + if (!db_and_table && !subquery) + return; + + if (db_and_table) + { + select_table_name = db_and_table->table; + + if (db_and_table->database.empty()) + { + db_and_table->database = select_database_name; + AddDefaultDatabaseVisitor visitor(select_database_name); + visitor.visit(query); + } + else + select_database_name = db_and_table->database; + } + else if (auto * ast_select = subquery->as()) + { + if (ast_select->list_of_selects->children.size() != 1) + throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); + + auto & inner_query = ast_select->list_of_selects->children.at(0); + + extractDependentTable(inner_query->as(), select_database_name, select_table_name); + } + else + throw Exception("Logical error while creating StorageLiveView." + " Could not retrieve table name from select query.", + DB::ErrorCodes::LOGICAL_ERROR); +} + +static void checkAllowedQueries(const ASTSelectQuery & query) +{ + if (query.prewhere() || query.final() || query.sample_size()) + throw Exception("LIVE VIEW cannot have PREWHERE, SAMPLE or FINAL.", DB::ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); + + ASTPtr subquery = extractTableExpression(query, 0); + if (!subquery) + return; + + if (const auto * ast_select = subquery->as()) + { + if (ast_select->list_of_selects->children.size() != 1) + throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); + + const auto & inner_query = ast_select->list_of_selects->children.at(0); + + checkAllowedQueries(inner_query->as()); + } +} + +StorageLiveView::StorageLiveView( + const String & table_name_, + const String & database_name_, + Context & local_context, + const ASTCreateQuery & query, + const ColumnsDescription & columns) + : IStorage(columns), table_name(table_name_), + database_name(database_name_), global_context(local_context.getGlobalContext()) +{ + if (!query.select) + throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); + + /// Default value, if only table name exist in the query + select_database_name = local_context.getCurrentDatabase(); + if (query.select->list_of_selects->children.size() != 1) + throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); + + inner_query = query.select->list_of_selects->children.at(0); + + ASTSelectQuery & select_query = typeid_cast(*inner_query); + extractDependentTable(select_query, select_database_name, select_table_name); + + /// If the table is not specified - use the table `system.one` + if (select_table_name.empty()) + { + select_database_name = "system"; + select_table_name = "one"; + } + + global_context.addDependency( + DatabaseAndTableName(select_database_name, select_table_name), + DatabaseAndTableName(database_name, table_name)); + + is_temporary = query.temporary; + + blocks_ptr = std::make_shared(); + active_ptr = std::make_shared(true); +} + +Block StorageLiveView::getHeader() const +{ + if (!sample_block) + { + auto storage = global_context.getTable(select_database_name, select_table_name); + sample_block = InterpreterSelectQuery(inner_query, global_context, storage).getSampleBlock(); + } + + return sample_block; +} + +bool StorageLiveView::getNewBlocks() +{ + Block block; + SipHash hash; + UInt128 key; + BlocksPtr new_blocks = std::make_shared(); + BlocksPtr new_mergeable_blocks = std::make_shared(); + + InterpreterSelectQuery interpreter(inner_query->clone(), global_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); + auto mergeable_stream = std::make_shared(interpreter.execute().in); + + while (Block block = mergeable_stream->read()) + new_mergeable_blocks->push_back(block); + + mergeable_blocks = std::make_shared>(); + mergeable_blocks->push_back(new_mergeable_blocks); + BlockInputStreamPtr from = std::make_shared(std::make_shared(new_mergeable_blocks), mergeable_stream->getHeader()); + auto proxy_storage = ProxyStorage::createProxyStorage(global_context.getTable(select_database_name, select_table_name), {from}, QueryProcessingStage::WithMergeableState); + InterpreterSelectQuery select(inner_query->clone(), global_context, proxy_storage, SelectQueryOptions(QueryProcessingStage::Complete)); + BlockInputStreamPtr data = std::make_shared(select.execute().in); + while (Block block = data->read()) + { + block.updateHash(hash); + new_blocks->push_back(block); + } + + hash.get128(key.low, key.high); + + /// Update blocks only if hash keys do not match + /// NOTE: hash could be different for the same result + /// if blocks are not in the same order + bool updated = false; + { + if (hash_key != key.toHexString()) + { + if (new_blocks->empty()) + { + new_blocks->push_back(getHeader()); + } + (*blocks_ptr) = new_blocks; + hash_key = key.toHexString(); + updated = true; + } + } + return updated; +} + +void StorageLiveView::checkTableCanBeDropped() const +{ + Dependencies dependencies = global_context.getDependencies(database_name, table_name); + if (!dependencies.empty()) + { + DatabaseAndTableName database_and_table_name = dependencies.front(); + throw Exception("Table has dependency " + database_and_table_name.first + "." + database_and_table_name.second, ErrorCodes::TABLE_WAS_NOT_DROPPED); + } +} + +void StorageLiveView::noUsersThread() +{ + if (shutdown_called) + return; + + bool drop_table = false; + + { + Poco::FastMutex::ScopedLock lock(noUsersThreadMutex); + while (1) + { + if (!noUsersThreadWakeUp && !noUsersThreadCondition.tryWait(noUsersThreadMutex, global_context.getSettingsRef().temporary_live_view_timeout.totalSeconds() * 1000)) + { + noUsersThreadWakeUp = false; + if (shutdown_called) + return; + if (hasUsers()) + return; + if (!global_context.getDependencies(database_name, table_name).empty()) + continue; + drop_table = true; + } + break; + } + } + + if (drop_table) + { + if (global_context.tryGetTable(database_name, table_name)) + { + try + { + /// We create and execute `drop` query for this table + auto drop_query = std::make_shared(); + drop_query->database = database_name; + drop_query->table = table_name; + ASTPtr ast_drop_query = drop_query; + InterpreterDropQuery drop_interpreter(ast_drop_query, global_context); + drop_interpreter.execute(); + } + catch (...) + { + } + } + } +} + +void StorageLiveView::startNoUsersThread() +{ + bool expected = false; + if (!startnousersthread_called.compare_exchange_strong(expected, true)) + return; + + if (is_dropped) + return; + + if (is_temporary) + { + if (no_users_thread.joinable()) + { + { + Poco::FastMutex::ScopedLock lock(noUsersThreadMutex); + noUsersThreadWakeUp = true; + noUsersThreadCondition.signal(); + } + no_users_thread.join(); + } + { + Poco::FastMutex::ScopedLock lock(noUsersThreadMutex); + noUsersThreadWakeUp = false; + } + if (!is_dropped) + no_users_thread = std::thread(&StorageLiveView::noUsersThread, this); + } + startnousersthread_called = false; +} + +void StorageLiveView::startup() +{ + startNoUsersThread(); +} + +void StorageLiveView::shutdown() +{ + bool expected = false; + if (!shutdown_called.compare_exchange_strong(expected, true)) + return; + + if (no_users_thread.joinable()) + { + Poco::FastMutex::ScopedLock lock(noUsersThreadMutex); + noUsersThreadWakeUp = true; + noUsersThreadCondition.signal(); + /// Must detach the no users thread + /// as we can't join it as it will result + /// in a deadlock + no_users_thread.detach(); + } +} + +StorageLiveView::~StorageLiveView() +{ + shutdown(); +} + +void StorageLiveView::drop() +{ + global_context.removeDependency( + DatabaseAndTableName(select_database_name, select_table_name), + DatabaseAndTableName(database_name, table_name)); + Poco::FastMutex::ScopedLock lock(mutex); + is_dropped = true; + condition.broadcast(); +} + +void StorageLiveView::refresh(const Context & context) +{ + auto alter_lock = lockAlterIntention(context.getCurrentQueryId()); + { + Poco::FastMutex::ScopedLock lock(mutex); + if (getNewBlocks()) + condition.broadcast(); + } +} + +BlockInputStreams StorageLiveView::read( + const Names & /*column_names*/, + const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum /*processed_stage*/, + const size_t /*max_block_size*/, + const unsigned /*num_streams*/) +{ + /// add user to the blocks_ptr + std::shared_ptr stream_blocks_ptr = blocks_ptr; + { + Poco::FastMutex::ScopedLock lock(mutex); + if (!(*blocks_ptr)) + { + if (getNewBlocks()) + condition.broadcast(); + } + } + return { std::make_shared(stream_blocks_ptr, getHeader()) }; +} + +BlockInputStreams StorageLiveView::watch( + const Names & /*column_names*/, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t /*max_block_size*/, + const unsigned /*num_streams*/) +{ + ASTWatchQuery & query = typeid_cast(*query_info.query); + + /// By default infinite stream of updates + int64_t length = -2; + + if (query.limit_length) + length = (int64_t)safeGet(typeid_cast(*query.limit_length).value); + + auto reader = std::make_shared(*this, blocks_ptr, active_ptr, condition, mutex, length, context.getSettingsRef().heartbeat_delay); + + if (no_users_thread.joinable()) + { + Poco::FastMutex::ScopedLock lock(noUsersThreadMutex); + noUsersThreadWakeUp = true; + noUsersThreadCondition.signal(); + } + + { + Poco::FastMutex::ScopedLock lock(mutex); + if (!(*blocks_ptr)) + { + if (getNewBlocks()) + condition.broadcast(); + } + } + + processed_stage = QueryProcessingStage::Complete; + + return { reader }; +} + +BlockOutputStreamPtr StorageLiveView::write(const ASTPtr & /*query*/, const Context & /*context*/) +{ + return std::make_shared(*this); +} + +void registerStorageLiveView(StorageFactory & factory) +{ + factory.registerStorage("LiveView", [](const StorageFactory::Arguments & args) + { + return StorageLiveView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns); + }); +} + +} diff --git a/dbms/src/Storages/StorageLiveView.h b/dbms/src/Storages/StorageLiveView.h new file mode 100644 index 00000000000..cb90e2b3e71 --- /dev/null +++ b/dbms/src/Storages/StorageLiveView.h @@ -0,0 +1,294 @@ +/* Copyright (c) 2018 BlackBerry Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class IAST; +using ASTPtr = std::shared_ptr; + +class StorageLiveView : public ext::shared_ptr_helper, public IStorage +{ +friend struct ext::shared_ptr_helper; +friend class LiveViewBlockOutputStream; + +public: + ~StorageLiveView() override; + String getName() const override { return "LiveView"; } + String getTableName() const override { return table_name; } + String getDatabaseName() const { return database_name; } + String getSelectDatabaseName() const { return select_database_name; } + String getSelectTableName() const { return select_table_name; } + + // const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } + ASTPtr getInnerQuery() const { return inner_query->clone(); }; + + /// It is passed inside the query and solved at its level. + bool supportsSampling() const override { return true; } + bool supportsFinal() const override { return true; } + + /// Mutex for the blocks and ready condition + Poco::FastMutex mutex; + /// New blocks ready condition to broadcast to readers + /// that new blocks are available + Poco::Condition condition; + + bool isTemporary() { return is_temporary; } + + /// Check if we have any readers + /// must be called with mutex locked + bool hasUsers() + { + return blocks_ptr.use_count() > 1; + } + + /// Check we we have any active readers + /// must be called with mutex locked + bool hasActiveUsers() + { + return active_ptr.use_count() > 1; + } + /// Background thread for temporary tables + /// which drops this table if there are no users + void startNoUsersThread(); + Poco::FastMutex noUsersThreadMutex; + bool noUsersThreadWakeUp{false}; + Poco::Condition noUsersThreadCondition; + + String getBlocksHashKey() + { + return hash_key; + } + + /// Reset blocks + /// must be called with mutex locked + void reset() + { + (*blocks_ptr).reset(); + mergeable_blocks.reset(); + hash_key = ""; + } + + void checkTableCanBeDropped() const override; + void drop() override; + void startup() override; + void shutdown() override; + + void refresh(const Context & context); + + BlockOutputStreamPtr write( + const ASTPtr &, + const Context &) override; + + BlockInputStreams read( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; + + BlockInputStreams watch( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size, + unsigned num_streams) override; + + std::shared_ptr getBlocksPtr() { return blocks_ptr; } + BlocksPtrs getMergeableBlocks() { return mergeable_blocks; } + std::shared_ptr getActivePtr() { return active_ptr; } + + /// Read new data blocks that store query result + bool getNewBlocks(); + + Block getHeader() const; + + static void writeIntoLiveView(StorageLiveView & live_view, + const Block & block, + const Context & context, + BlockOutputStreamPtr & output) + { + /// Check if live view has any readers if not + /// just reset blocks to empty and do nothing else + /// When first reader comes the blocks will be read. + { + Poco::FastMutex::ScopedLock lock(live_view.mutex); + if (!live_view.hasActiveUsers()) + { + live_view.reset(); + return; + } + } + + SipHash hash; + UInt128 key; + BlockInputStreams from; + BlocksPtr blocks = std::make_shared(); + BlocksPtrs mergeable_blocks; + BlocksPtr new_mergeable_blocks = std::make_shared(); + + { + auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); + BlockInputStreams streams = {std::make_shared(block)}; + auto proxy_storage = std::make_shared(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns); + InterpreterSelectQuery select_block(live_view.getInnerQuery(), context, proxy_storage, + QueryProcessingStage::WithMergeableState); + auto data_mergeable_stream = std::make_shared(select_block.execute().in); + while (Block this_block = data_mergeable_stream->read()) + new_mergeable_blocks->push_back(this_block); + } + + if (new_mergeable_blocks->empty()) + return; + + { + Poco::FastMutex::ScopedLock lock(live_view.mutex); + + mergeable_blocks = live_view.getMergeableBlocks(); + if (!mergeable_blocks || mergeable_blocks->size() >= 64) + { + mergeable_blocks = std::make_shared>(); + BlocksPtr base_mergeable_blocks = std::make_shared(); + InterpreterSelectQuery interpreter(live_view.getInnerQuery(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names{}); + auto view_mergeable_stream = std::make_shared(interpreter.execute().in); + while (Block this_block = view_mergeable_stream->read()) + base_mergeable_blocks->push_back(this_block); + mergeable_blocks->push_back(base_mergeable_blocks); + } + + /// Need make new mergeable block structure match the other mergeable blocks + if (!mergeable_blocks->front()->empty() && !new_mergeable_blocks->empty()) + { + auto sample_block = mergeable_blocks->front()->front(); + auto sample_new_block = new_mergeable_blocks->front(); + for (auto col : sample_new_block) + { + for (auto & new_block : *new_mergeable_blocks) + { + if (!sample_block.has(col.name)) + new_block.erase(col.name); + } + } + } + + mergeable_blocks->push_back(new_mergeable_blocks); + + /// Create from blocks streams + for (auto & blocks : *mergeable_blocks) + { + auto sample_block = mergeable_blocks->front()->front().cloneEmpty(); + BlockInputStreamPtr stream = std::make_shared(std::make_shared(blocks), sample_block); + from.push_back(std::move(stream)); + } + } + + auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); + auto proxy_storage = std::make_shared(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState); + InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete); + BlockInputStreamPtr data = std::make_shared(select.execute().in); + while (Block this_block = data->read()) + { + this_block.updateHash(hash); + blocks->push_back(this_block); + } + /// get hash key + hash.get128(key.low, key.high); + /// Update blocks only if hash keys do not match + /// NOTE: hash could be different for the same result + /// if blocks are not in the same order + if (live_view.getBlocksHashKey() != key.toHexString()) + { + auto sample_block = blocks->front().cloneEmpty(); + BlockInputStreamPtr new_data = std::make_shared(std::make_shared(blocks), sample_block); + { + Poco::FastMutex::ScopedLock lock(live_view.mutex); + copyData(*new_data, *output); + } + } + } + +private: + String select_database_name; + String select_table_name; + String table_name; + String database_name; + ASTPtr inner_query; + Context & global_context; + bool is_temporary {false}; + mutable Block sample_block; + + /// Active users + std::shared_ptr active_ptr; + /// Current data blocks that store query result + std::shared_ptr blocks_ptr; + BlocksPtr new_blocks; + BlocksPtrs mergeable_blocks; + + /// Current blocks hash key + String hash_key; + String new_hash_key; + + void noUsersThread(); + std::thread no_users_thread; + std::atomic shutdown_called{false}; + std::atomic startnousersthread_called{false}; + + StorageLiveView( + const String & table_name_, + const String & database_name_, + Context & local_context, + const ASTCreateQuery & query, + const ColumnsDescription & columns + ); +}; + +class LiveViewBlockOutputStream : public IBlockOutputStream +{ +public: + explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {} + + void write(const Block & block) override + { + if (!new_blocks) + new_blocks = std::make_shared(); + + new_blocks->push_back(block); + // FIXME: do I need to calculate block hash? + (*storage.blocks_ptr) = new_blocks; + new_blocks.reset(); + storage.condition.broadcast(); + } + + Block getHeader() const override { return storage.getHeader(); } + +private: + BlocksPtr new_blocks; + String new_hash_key; + StorageLiveView & storage; +}; + +}