From 4c2f5e1c8da47559860bf3f7ab3f4eaf6a209afe Mon Sep 17 00:00:00 2001 From: Vxider Date: Fri, 24 Jan 2020 05:45:45 +0300 Subject: [PATCH] build fix --- .../PushingToViewsBlockOutputStream.cpp | 12 +-- .../Storages/WindowView/StorageWindowView.cpp | 73 ++++++++----------- .../Storages/WindowView/StorageWindowView.h | 19 +---- .../WindowView/WindowViewProxyStorage.h | 6 +- 4 files changed, 40 insertions(+), 70 deletions(-) diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 49bc1eac520..66ce5e9f9ee 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -84,18 +84,8 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( out = io.out; } else if (dynamic_cast(dependent_table.get())) - out = std::make_shared( - database_table.first, database_table.second, dependent_table, *views_context, ASTPtr(), true); + out = std::make_shared(dependent_table, *views_context, ASTPtr(), true); else if (dynamic_cast(dependent_table.get())) - out = std::make_shared( - database_table.first, database_table.second, dependent_table, *views_context, ASTPtr(), true); - else - out = std::make_shared( - database_table.first, database_table.second, dependent_table, *views_context, ASTPtr()); - - views.emplace_back(ViewInfo{std::move(query), database_table.first, database_table.second, std::move(out)}); - } - else if (dynamic_cast(dependent_table.get())) out = std::make_shared(dependent_table, *views_context, ASTPtr(), true); else out = std::make_shared(dependent_table, *views_context, ASTPtr()); diff --git a/dbms/src/Storages/WindowView/StorageWindowView.cpp b/dbms/src/Storages/WindowView/StorageWindowView.cpp index 8ff4155c3d7..55aa3fa8426 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.cpp +++ b/dbms/src/Storages/WindowView/StorageWindowView.cpp @@ -152,20 +152,19 @@ static void extractDependentTable(ASTSelectQuery & query, String & select_databa void StorageWindowView::checkTableCanBeDropped() const { - Dependencies dependencies = global_context.getDependencies(database_name, table_name); + auto table_id = getStorageID(); + Dependencies dependencies = global_context.getDependencies(table_id); if (!dependencies.empty()) { - DatabaseAndTableName database_and_table_name = dependencies.front(); - throw Exception( - "Table has dependency " + database_and_table_name.first + "." + database_and_table_name.second, - ErrorCodes::TABLE_WAS_NOT_DROPPED); + StorageID dependent_table_id = dependencies.front(); + throw Exception("Table has dependency " + dependent_table_id.getNameForLogs(), ErrorCodes::TABLE_WAS_NOT_DROPPED); } } void StorageWindowView::drop(TableStructureWriteLockHolder &) { - global_context.removeDependency( - DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name)); + auto table_id = getStorageID(); + global_context.removeDependency(select_table_id, table_id); std::lock_guard lock(mutex); is_dropped = true; @@ -207,7 +206,7 @@ UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec) void StorageWindowView::threadFuncToTable() { - while (!shutdown_called && has_target_table) + while (!shutdown_called && !target_table_id.empty()) { std::unique_lock lock(flushTableMutex); UInt64 timestamp_usec = static_cast(Poco::Timestamp().epochMicroseconds()); @@ -236,7 +235,7 @@ BlockInputStreams StorageWindowView::watch( size_t /*max_block_size*/, const unsigned /*num_streams*/) { - if (has_target_table) + if (!target_table_id.empty()) throw Exception("WATCH query is disabled for " + getName() + " when constructed with 'TO' clause.", ErrorCodes::INCORRECT_QUERY); if (active_ptr.use_count() > 1) @@ -279,9 +278,8 @@ Block StorageWindowView::getHeader() const { if (!sample_block) { - auto storage = global_context.getTable(select_database_name, select_table_name); - sample_block = InterpreterSelectQuery( - getInnerQuery(), global_context, storage, SelectQueryOptions(QueryProcessingStage::Complete)) + auto storage = global_context.getTable(select_table_id); + sample_block = InterpreterSelectQuery(getInnerQuery(), global_context, storage, SelectQueryOptions(QueryProcessingStage::Complete)) .getSampleBlock(); for (size_t i = 0; i < sample_block.columns(); ++i) sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst(); @@ -290,24 +288,14 @@ Block StorageWindowView::getHeader() const return sample_block; } -StoragePtr & StorageWindowView::getParentStorage() -{ - if (!parent_storage) - parent_storage = global_context.getTable(getSelectDatabaseName(), getSelectTableName()); - return parent_storage; -} - StorageWindowView::StorageWindowView( - const String & table_name_, - const String & database_name_, + const StorageID & table_id_, Context & local_context, const ASTCreateQuery & query, const ColumnsDescription & columns_) - : table_name(table_name_) - , database_name(database_name_) + : IStorage(table_id_) , global_context(local_context.getGlobalContext()) , time_zone(DateLUT::instance()) - , log(&Poco::Logger::get("StorageWindowView")) { setColumns(columns_); @@ -315,14 +303,16 @@ StorageWindowView::StorageWindowView( throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); /// Default value, if only table name exist in the query - select_database_name = local_context.getCurrentDatabase(); if (query.select->list_of_selects->children.size() != 1) throw Exception("UNION is not supported for Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); auto inner_query_ = query.select->list_of_selects->children.at(0); ASTSelectQuery & select_query = typeid_cast(*inner_query_); + String select_database_name = local_context.getCurrentDatabase(); + String select_table_name; extractDependentTable(select_query, select_database_name, select_table_name); + select_table_id = StorageID(select_database_name, select_table_name); inner_query = innerQueryParser(select_query); /// If the table is not specified - use the table `system.one` @@ -332,14 +322,13 @@ StorageWindowView::StorageWindowView( select_table_name = "one"; } - global_context.addDependency( - DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name)); + global_context.addDependency(select_table_id, table_id_); + + parent_storage = local_context.getTable(select_table_id); if (!query.to_table.empty()) { - has_target_table = true; - target_database_name = query.to_database; - target_table_name = query.to_table; + target_table_id = StorageID(query.to_database, query.to_table); } is_temporary = query.temporary; @@ -349,7 +338,7 @@ StorageWindowView::StorageWindowView( active_ptr = std::make_shared(true); - toTableTask = global_context.getSchedulePool().createTask(log->name(), [this] { threadFuncToTable(); }); + toTableTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncToTable(); }); toTableTask->deactivate(); } @@ -403,7 +392,7 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query) void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context) { BlockInputStreams streams = {std::make_shared(block)}; - auto window_proxy_storage = std::make_shared( + auto window_proxy_storage = std::make_shared(StorageID("", "WindowViewProxyStorage"), window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns); InterpreterSelectQuery select_block( window_view.getInnerQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState); @@ -432,18 +421,18 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con StoragePtr StorageWindowView::getTargetTable() const { - return global_context.getTable(target_database_name, target_table_name); + return global_context.getTable(target_table_id); } StoragePtr StorageWindowView::tryGetTargetTable() const { - return global_context.tryGetTable(target_database_name, target_table_name); + return global_context.tryGetTable(target_table_id); } void StorageWindowView::startup() { // Start the working thread - if (has_target_table) + if (!target_table_id.empty()) toTableTask->activateAndSchedule(); startNoUsersThread(temporary_window_view_timeout); } @@ -553,7 +542,7 @@ BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr() BlockInputStreamPtr stream = std::make_shared(mergeable_blocks, sample_block_, mutex); from.push_back(std::move(stream)); auto proxy_storage = std::make_shared( - getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState); + StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState); InterpreterSelectQuery select(getInnerQuery(), global_context, proxy_storage, QueryProcessingStage::Complete); BlockInputStreamPtr data = std::make_shared(select.execute().in); return data; @@ -575,6 +564,7 @@ void StorageWindowView::noUsersThread(std::shared_ptr storage if (storage->shutdown_called) return; + auto table_id = storage->getStorageID(); { while (1) { @@ -584,7 +574,7 @@ void StorageWindowView::noUsersThread(std::shared_ptr storage storage->no_users_thread_wakeup = false; if (storage->shutdown_called) return; - if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty()) + if (!storage->global_context.getDependencies(table_id).empty()) continue; drop_table = true; } @@ -594,14 +584,14 @@ void StorageWindowView::noUsersThread(std::shared_ptr storage if (drop_table) { - if (storage->global_context.tryGetTable(storage->database_name, storage->table_name)) + if (storage->global_context.tryGetTable(table_id)) { try { /// We create and execute `drop` query for this table auto drop_query = std::make_shared(); - drop_query->database = storage->database_name; - drop_query->table = storage->table_name; + drop_query->database = table_id.database_name; + drop_query->table = table_id.table_name; drop_query->kind = ASTDropQuery::Kind::Drop; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context); @@ -657,7 +647,8 @@ void registerStorageWindowView(StorageFactory & factory) throw Exception( "Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')", ErrorCodes::SUPPORT_IS_DISABLED); - return StorageWindowView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns); + + return StorageWindowView::create(args.table_id, args.local_context, args.query, args.columns); }); } } diff --git a/dbms/src/Storages/WindowView/StorageWindowView.h b/dbms/src/Storages/WindowView/StorageWindowView.h index 94c900acf28..c60f62822c2 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.h +++ b/dbms/src/Storages/WindowView/StorageWindowView.h @@ -23,10 +23,6 @@ class StorageWindowView : public ext::shared_ptr_helper, publ public: ~StorageWindowView() override; String getName() const override { return "WindowView"; } - String getTableName() const override { return table_name; } - String getDatabaseName() const override { return database_name; } - String getSelectDatabaseName() const { return select_database_name; } - String getSelectTableName() const { return select_table_name; } ASTPtr getInnerQuery() const { return inner_query->clone(); } @@ -75,7 +71,7 @@ public: Block getHeader() const; - StoragePtr & getParentStorage(); + StoragePtr & getParentStorage() { return parent_storage; } static void writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context); @@ -84,10 +80,7 @@ public: inline UInt32 getWindowUpperBound(UInt32 time_sec); private: - String select_database_name; - String select_table_name; - String table_name; - String database_name; + StorageID select_table_id = StorageID::createEmpty(); ASTPtr inner_query; String window_column_name; String window_end_column_alias; @@ -111,9 +104,7 @@ private: Int64 window_num_units; const DateLUTImpl & time_zone; - std::atomic has_target_table{false}; - String target_database_name; - String target_table_name; + StorageID target_table_id = StorageID::createEmpty(); static void noUsersThread(std::shared_ptr storage, const UInt64 & timeout); inline void flushToTable(); @@ -125,15 +116,13 @@ private: std::atomic start_no_users_thread_called{false}; UInt64 temporary_window_view_timeout; - Poco::Logger * log; Poco::Timestamp timestamp; BackgroundSchedulePool::TaskHolder toTableTask; BackgroundSchedulePool::TaskHolder toTableTask_preprocess; StorageWindowView( - const String & table_name_, - const String & database_name_, + const StorageID & table_id_, Context & local_context, const ASTCreateQuery & query, const ColumnsDescription & columns); diff --git a/dbms/src/Storages/WindowView/WindowViewProxyStorage.h b/dbms/src/Storages/WindowView/WindowViewProxyStorage.h index 79b069c3745..377b04c6713 100644 --- a/dbms/src/Storages/WindowView/WindowViewProxyStorage.h +++ b/dbms/src/Storages/WindowView/WindowViewProxyStorage.h @@ -12,14 +12,14 @@ namespace DB class WindowViewProxyStorage : public IStorage { public: - WindowViewProxyStorage(StoragePtr storage_, BlockInputStreams streams_, QueryProcessingStage::Enum to_stage_) - : storage(std::move(storage_)) + WindowViewProxyStorage(const StorageID & table_id_, StoragePtr storage_, BlockInputStreams streams_, QueryProcessingStage::Enum to_stage_) + : IStorage(table_id_) + , storage(std::move(storage_)) , streams(std::move(streams_)) , to_stage(to_stage_) {} public: std::string getName() const override { return "WindowViewProxyStorage(" + storage->getName() + ")"; } - std::string getTableName() const override { return storage->getTableName(); } bool isRemote() const override { return storage->isRemote(); } bool supportsSampling() const override { return storage->supportsSampling(); }