diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 4bea23d4e90..2223b61efeb 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -206,10 +206,12 @@ namespace DB DECLARE(UInt64, threadpool_writer_pool_size, 100, "Size of background pool for write requests to object storages", 0) \ DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \ DECLARE(UInt32, allowed_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \ + DECLARE(UInt64, startup_mv_delay_ms, 0, "Debug parameter to simulate materizlied view creation delay", 0) \ // clang-format on + /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below DECLARE_SETTINGS_TRAITS(ServerSettingsTraits, LIST_OF_SERVER_SETTINGS) diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index ad12f1264f8..eb90bbe25de 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -209,6 +209,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables { size_t prev_tables_count = metadata.parsed_tables.size(); size_t prev_total_dictionaries = metadata.total_dictionaries; + size_t prev_total_materialized_views = metadata.total_materialized_views; auto process_metadata = [&metadata, is_startup, local_context, this](const String & file_name) { @@ -276,6 +277,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables std::lock_guard lock{metadata.mutex}; metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast}; metadata.total_dictionaries += create_query->is_dictionary; + metadata.total_materialized_views += create_query->is_materialized_view; } } catch (Exception & e) @@ -289,10 +291,17 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables size_t objects_in_database = metadata.parsed_tables.size() - prev_tables_count; size_t dictionaries_in_database = metadata.total_dictionaries - prev_total_dictionaries; + size_t materialized_views_in_database = metadata.total_materialized_views - prev_total_materialized_views; size_t tables_in_database = objects_in_database - dictionaries_in_database; - LOG_INFO(log, "Metadata processed, database {} has {} tables and {} dictionaries in total.", - TSA_SUPPRESS_WARNING_FOR_READ(database_name), tables_in_database, dictionaries_in_database); + LOG_INFO(log, "Metadata processed, database {} has {} tables, {} dictionaries and {} materialized views in total.", + TSA_SUPPRESS_WARNING_FOR_READ(database_name), tables_in_database, dictionaries_in_database, materialized_views_in_database); + + // if (materialized_views_in_database) + // { + + // } + } void DatabaseOrdinary::loadTableFromMetadata( @@ -317,6 +326,8 @@ void DatabaseOrdinary::loadTableFromMetadata( mode); attachTable(local_context, table_name, table, getTableDataPath(query)); + + table->pushDependencies(); } catch (Exception & e) { diff --git a/src/Databases/DatabasesCommon.cpp b/src/Databases/DatabasesCommon.cpp index 7bb4d23ef92..02ec58e3a95 100644 --- a/src/Databases/DatabasesCommon.cpp +++ b/src/Databases/DatabasesCommon.cpp @@ -434,6 +434,8 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c for (auto metric : getAttachedCountersForStorage(table)) CurrentMetrics::add(metric); } + + // if (DatabaseCatalog::iiMa } void DatabaseWithOwnTablesBase::shutdown() diff --git a/src/Databases/TablesLoader.cpp b/src/Databases/TablesLoader.cpp index 733e5d53981..d2dc9f05b4e 100644 --- a/src/Databases/TablesLoader.cpp +++ b/src/Databases/TablesLoader.cpp @@ -107,6 +107,8 @@ LoadTaskPtrs TablesLoader::startupTablesAsync(LoadJobSet startup_after) LoadTaskPtrs result; std::unordered_map startup_database; /// database name -> all its tables startup tasks + + for (const auto & table_id : all_loading_dependencies.getTables()) { // Make startup table task diff --git a/src/Databases/TablesLoader.h b/src/Databases/TablesLoader.h index bf469d83245..3e3736aa109 100644 --- a/src/Databases/TablesLoader.h +++ b/src/Databases/TablesLoader.h @@ -44,6 +44,7 @@ struct ParsedTablesMetadata /// For logging size_t total_dictionaries = 0; + size_t total_materialized_views = 0; }; /// Loads tables (and dictionaries) from specified databases diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0dc48634282..454f7df5ec1 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -572,6 +572,11 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part moves between shards are not supported by storage {}", getName()); } + virtual void pushDependencies() + { + } + + /** If the table have to do some complicated work on startup, * that must be postponed after creation of table object * (like launching some background threads), diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 294e983388e..085cb1690d9 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -187,6 +187,7 @@ StorageKafka::StorageKafka( , thread_per_consumer((*kafka_settings)[KafkaSetting::kafka_thread_per_consumer].value) , collection_name(collection_name_) { + LOG_TRACE(log, "Top of StorageKafka ctor."); kafka_settings->sanityCheck(); if ((*kafka_settings)[KafkaSetting::kafka_handle_error_mode] == StreamingHandleErrorMode::STREAM) @@ -524,6 +525,7 @@ size_t StorageKafka::getPollTimeoutMillisecond() const void StorageKafka::threadFunc(size_t idx) { + LOG_DEBUG(log, "Top of StorageKafka::threadFunc"); assert(idx < tasks.size()); auto task = tasks[idx]; std::string exception_str; @@ -533,15 +535,18 @@ void StorageKafka::threadFunc(size_t idx) auto table_id = getStorageID(); // Check if at least one direct dependency is attached size_t num_views = DatabaseCatalog::instance().getDependentViews(table_id).size(); + LOG_DEBUG(log, "StorageKafka::threadFunc - before if"); if (num_views) { auto start_time = std::chrono::steady_clock::now(); mv_attached.store(true); + LOG_DEBUG(log, "StorageKafka::threadFunc - before while"); // Keep streaming as long as there are attached views and streaming is not cancelled while (!task->stream_cancelled) { + LOG_DEBUG(log, "StorageKafka::threadFunc - before StorageKafkaUtils::checkDependencies"); if (!StorageKafkaUtils::checkDependencies(table_id, getContext())) break; @@ -564,6 +569,9 @@ void StorageKafka::threadFunc(size_t idx) } } } + else + LOG_DEBUG(log, "No attached views"); + } catch (...) { diff --git a/src/Storages/Kafka/StorageKafkaUtils.cpp b/src/Storages/Kafka/StorageKafkaUtils.cpp index 119aadd11d8..7ce9152aeff 100644 --- a/src/Storages/Kafka/StorageKafkaUtils.cpp +++ b/src/Storages/Kafka/StorageKafkaUtils.cpp @@ -425,15 +425,19 @@ bool checkDependencies(const StorageID & table_id, const ContextPtr& context) // Check the dependencies are ready? for (const auto & view_id : view_ids) { + LOG_TRACE(&Poco::Logger::get("kafka checkDependencies"), "Top of for"); + auto view = DatabaseCatalog::instance().tryGetTable(view_id, context); if (!view) return false; + LOG_TRACE(&Poco::Logger::get("kafka checkDependencies"), "Target table"); // If it materialized view, check it's target table auto * materialized_view = dynamic_cast(view.get()); if (materialized_view && !materialized_view->tryGetTargetTable()) return false; + LOG_TRACE(&Poco::Logger::get("kafka checkDependencies"), "Transitive dependencies"); // Check all its dependencies if (!checkDependencies(view_id, context)) return false; diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 9491c40b65f..d4049d2f388 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -40,6 +40,8 @@ #include +#include + namespace DB { namespace Setting @@ -745,8 +747,35 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id) refresher->rename(new_table_id, getTargetTableId()); } +void StorageMaterializedView::pushDependencies() +{ + assert(!dependencies_are_tracked); + if (!dependencies_are_tracked) + { + auto metadata_snapshot = getInMemoryMetadataPtr(); + const auto & select_query = metadata_snapshot->getSelectQuery(); + if (!select_query.select_table_id.empty()) + DatabaseCatalog::instance().addViewDependency(select_query.select_table_id, getStorageID()); + dependencies_are_tracked = true; + } +} + void StorageMaterializedView::startup() { + if (const auto configured_delay_ms = getContext()->getServerSettings().startup_mv_delay_ms; configured_delay_ms) + { + std::random_device rd; + const auto delay_ms = std::uniform_int_distribution<>(0, 1)(rd) ? configured_delay_ms : 0UL; + if (delay_ms) + { + LOG_DEBUG(&Poco::Logger::get("StorageMaterializedView"), "sleeping in startup of {}", getStorageID().table_name); + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + LOG_DEBUG(&Poco::Logger::get("StorageMaterializedView"), "woken up in startup of {}", getStorageID().table_name); + } + } + + pushDependencies(); + auto metadata_snapshot = getInMemoryMetadataPtr(); const auto & select_query = metadata_snapshot->getSelectQuery(); if (!select_query.select_table_id.empty()) diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index e39642066b4..690b206a892 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -67,6 +67,7 @@ public: void renameInMemory(const StorageID & new_table_id) override; + void pushDependencies() override; void startup() override; void shutdown(bool is_drop) override; @@ -116,6 +117,8 @@ private: /// have UUID, and we do inner table lookup by name instead. bool fixed_uuid = true; + bool dependencies_are_tracked = false; + friend class RefreshTask; void checkStatementCanBeForwarded() const; diff --git a/tests/integration/test_async_load_databases/configs/config.xml b/tests/integration/test_async_load_databases/configs/config.xml index 858780faac9..d236ccf62f1 100644 --- a/tests/integration/test_async_load_databases/configs/config.xml +++ b/tests/integration/test_async_load_databases/configs/config.xml @@ -1,3 +1,4 @@ true + 10000 diff --git a/tests/integration/test_async_load_databases/test.py b/tests/integration/test_async_load_databases/test.py index acd3ef7455b..9701747cf4a 100644 --- a/tests/integration/test_async_load_databases/test.py +++ b/tests/integration/test_async_load_databases/test.py @@ -198,6 +198,7 @@ def test_multiple_tables(started_cluster): query(f"drop table test.table_{i} sync") +<<<<<<< HEAD def test_async_load_system_database(started_cluster): id = 1 for i in range(4): @@ -242,3 +243,25 @@ def test_async_load_system_database(started_cluster): for i in range(id - 1): node2.query(f"drop table if exists system.text_log_{i + 1}_test") node2.query(f"drop table if exists system.query_log_{i + 1}_test") +======= +def test_materialzed_views(started_cluster): + query = instance.query + query("create database test_mv") + query("create table test_mv.t (Id UInt64) engine=MergeTree order by Id") + query("create table test_mv.a (Id UInt64) engine=MergeTree order by Id") + query("create table test_mv.z (Id UInt64) engine=MergeTree order by Id") + query("create materialized view t_to_a to test_mv.a as select Id from test_mv.t") + query("create materialized view t_to_z to test_mv.z as select Id from test_mv.t") + + instance.restart_clickhouse() + query("insert into test_mv.t values(42)") + assert query("select * from test_mv.a Format CSV") == "42\n" + assert query("select * from test_mv.z Format CSV") == "42\n" + + query("drop materialized view t_to_a") + query("drop materialized view t_to_z") + query("drop table test_mv.t") + query("drop table test_mv.a") + query("drop table test_mv.z") + query("drop database test_mv") +>>>>>>> 4d4a53cc440 (mv_dependencies: initial)