mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge 287aeba8eb
into da2176d696
This commit is contained in:
commit
cb5fc34f29
@ -206,10 +206,12 @@ namespace DB
|
|||||||
DECLARE(UInt64, threadpool_writer_pool_size, 100, "Size of background pool for write requests to object storages", 0) \
|
DECLARE(UInt64, threadpool_writer_pool_size, 100, "Size of background pool for write requests to object storages", 0) \
|
||||||
DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \
|
DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \
|
||||||
DECLARE(UInt32, allowed_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \
|
DECLARE(UInt32, allowed_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \
|
||||||
|
DECLARE(UInt64, startup_mv_delay_ms, 0, "Debug parameter to simulate materizlied view creation delay", 0) \
|
||||||
|
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
||||||
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below
|
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below
|
||||||
|
|
||||||
DECLARE_SETTINGS_TRAITS(ServerSettingsTraits, LIST_OF_SERVER_SETTINGS)
|
DECLARE_SETTINGS_TRAITS(ServerSettingsTraits, LIST_OF_SERVER_SETTINGS)
|
||||||
|
@ -209,6 +209,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
|
|||||||
{
|
{
|
||||||
size_t prev_tables_count = metadata.parsed_tables.size();
|
size_t prev_tables_count = metadata.parsed_tables.size();
|
||||||
size_t prev_total_dictionaries = metadata.total_dictionaries;
|
size_t prev_total_dictionaries = metadata.total_dictionaries;
|
||||||
|
size_t prev_total_materialized_views = metadata.total_materialized_views;
|
||||||
|
|
||||||
auto process_metadata = [&metadata, is_startup, local_context, this](const String & file_name)
|
auto process_metadata = [&metadata, is_startup, local_context, this](const String & file_name)
|
||||||
{
|
{
|
||||||
@ -276,6 +277,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
|
|||||||
std::lock_guard lock{metadata.mutex};
|
std::lock_guard lock{metadata.mutex};
|
||||||
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
|
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
|
||||||
metadata.total_dictionaries += create_query->is_dictionary;
|
metadata.total_dictionaries += create_query->is_dictionary;
|
||||||
|
metadata.total_materialized_views += create_query->is_materialized_view;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
@ -289,10 +291,17 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
|
|||||||
|
|
||||||
size_t objects_in_database = metadata.parsed_tables.size() - prev_tables_count;
|
size_t objects_in_database = metadata.parsed_tables.size() - prev_tables_count;
|
||||||
size_t dictionaries_in_database = metadata.total_dictionaries - prev_total_dictionaries;
|
size_t dictionaries_in_database = metadata.total_dictionaries - prev_total_dictionaries;
|
||||||
|
size_t materialized_views_in_database = metadata.total_materialized_views - prev_total_materialized_views;
|
||||||
size_t tables_in_database = objects_in_database - dictionaries_in_database;
|
size_t tables_in_database = objects_in_database - dictionaries_in_database;
|
||||||
|
|
||||||
LOG_INFO(log, "Metadata processed, database {} has {} tables and {} dictionaries in total.",
|
LOG_INFO(log, "Metadata processed, database {} has {} tables, {} dictionaries and {} materialized views in total.",
|
||||||
TSA_SUPPRESS_WARNING_FOR_READ(database_name), tables_in_database, dictionaries_in_database);
|
TSA_SUPPRESS_WARNING_FOR_READ(database_name), tables_in_database, dictionaries_in_database, materialized_views_in_database);
|
||||||
|
|
||||||
|
// if (materialized_views_in_database)
|
||||||
|
// {
|
||||||
|
|
||||||
|
// }
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void DatabaseOrdinary::loadTableFromMetadata(
|
void DatabaseOrdinary::loadTableFromMetadata(
|
||||||
@ -317,6 +326,8 @@ void DatabaseOrdinary::loadTableFromMetadata(
|
|||||||
mode);
|
mode);
|
||||||
|
|
||||||
attachTable(local_context, table_name, table, getTableDataPath(query));
|
attachTable(local_context, table_name, table, getTableDataPath(query));
|
||||||
|
|
||||||
|
table->pushDependencies();
|
||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
{
|
{
|
||||||
|
@ -434,6 +434,8 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
|
|||||||
for (auto metric : getAttachedCountersForStorage(table))
|
for (auto metric : getAttachedCountersForStorage(table))
|
||||||
CurrentMetrics::add(metric);
|
CurrentMetrics::add(metric);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if (DatabaseCatalog::iiMa
|
||||||
}
|
}
|
||||||
|
|
||||||
void DatabaseWithOwnTablesBase::shutdown()
|
void DatabaseWithOwnTablesBase::shutdown()
|
||||||
|
@ -44,6 +44,7 @@ struct ParsedTablesMetadata
|
|||||||
|
|
||||||
/// For logging
|
/// For logging
|
||||||
size_t total_dictionaries = 0;
|
size_t total_dictionaries = 0;
|
||||||
|
size_t total_materialized_views = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Loads tables (and dictionaries) from specified databases
|
/// Loads tables (and dictionaries) from specified databases
|
||||||
|
@ -572,6 +572,11 @@ public:
|
|||||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part moves between shards are not supported by storage {}", getName());
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part moves between shards are not supported by storage {}", getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual void pushDependencies()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** If the table have to do some complicated work on startup,
|
/** If the table have to do some complicated work on startup,
|
||||||
* that must be postponed after creation of table object
|
* that must be postponed after creation of table object
|
||||||
* (like launching some background threads),
|
* (like launching some background threads),
|
||||||
|
@ -187,6 +187,7 @@ StorageKafka::StorageKafka(
|
|||||||
, thread_per_consumer((*kafka_settings)[KafkaSetting::kafka_thread_per_consumer].value)
|
, thread_per_consumer((*kafka_settings)[KafkaSetting::kafka_thread_per_consumer].value)
|
||||||
, collection_name(collection_name_)
|
, collection_name(collection_name_)
|
||||||
{
|
{
|
||||||
|
LOG_TRACE(log, "Top of StorageKafka ctor.");
|
||||||
kafka_settings->sanityCheck();
|
kafka_settings->sanityCheck();
|
||||||
|
|
||||||
if ((*kafka_settings)[KafkaSetting::kafka_handle_error_mode] == StreamingHandleErrorMode::STREAM)
|
if ((*kafka_settings)[KafkaSetting::kafka_handle_error_mode] == StreamingHandleErrorMode::STREAM)
|
||||||
@ -524,6 +525,7 @@ size_t StorageKafka::getPollTimeoutMillisecond() const
|
|||||||
|
|
||||||
void StorageKafka::threadFunc(size_t idx)
|
void StorageKafka::threadFunc(size_t idx)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(log, "Top of StorageKafka::threadFunc");
|
||||||
assert(idx < tasks.size());
|
assert(idx < tasks.size());
|
||||||
auto task = tasks[idx];
|
auto task = tasks[idx];
|
||||||
std::string exception_str;
|
std::string exception_str;
|
||||||
@ -533,15 +535,18 @@ void StorageKafka::threadFunc(size_t idx)
|
|||||||
auto table_id = getStorageID();
|
auto table_id = getStorageID();
|
||||||
// Check if at least one direct dependency is attached
|
// Check if at least one direct dependency is attached
|
||||||
size_t num_views = DatabaseCatalog::instance().getDependentViews(table_id).size();
|
size_t num_views = DatabaseCatalog::instance().getDependentViews(table_id).size();
|
||||||
|
LOG_DEBUG(log, "StorageKafka::threadFunc - before if");
|
||||||
if (num_views)
|
if (num_views)
|
||||||
{
|
{
|
||||||
auto start_time = std::chrono::steady_clock::now();
|
auto start_time = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
mv_attached.store(true);
|
mv_attached.store(true);
|
||||||
|
|
||||||
|
LOG_DEBUG(log, "StorageKafka::threadFunc - before while");
|
||||||
// Keep streaming as long as there are attached views and streaming is not cancelled
|
// Keep streaming as long as there are attached views and streaming is not cancelled
|
||||||
while (!task->stream_cancelled)
|
while (!task->stream_cancelled)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(log, "StorageKafka::threadFunc - before StorageKafkaUtils::checkDependencies");
|
||||||
if (!StorageKafkaUtils::checkDependencies(table_id, getContext()))
|
if (!StorageKafkaUtils::checkDependencies(table_id, getContext()))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -564,6 +569,9 @@ void StorageKafka::threadFunc(size_t idx)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
LOG_DEBUG(log, "No attached views");
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
@ -425,15 +425,19 @@ bool checkDependencies(const StorageID & table_id, const ContextPtr& context)
|
|||||||
// Check the dependencies are ready?
|
// Check the dependencies are ready?
|
||||||
for (const auto & view_id : view_ids)
|
for (const auto & view_id : view_ids)
|
||||||
{
|
{
|
||||||
|
LOG_TRACE(&Poco::Logger::get("kafka checkDependencies"), "Top of for");
|
||||||
|
|
||||||
auto view = DatabaseCatalog::instance().tryGetTable(view_id, context);
|
auto view = DatabaseCatalog::instance().tryGetTable(view_id, context);
|
||||||
if (!view)
|
if (!view)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
LOG_TRACE(&Poco::Logger::get("kafka checkDependencies"), "Target table");
|
||||||
// If it materialized view, check it's target table
|
// If it materialized view, check it's target table
|
||||||
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get());
|
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get());
|
||||||
if (materialized_view && !materialized_view->tryGetTargetTable())
|
if (materialized_view && !materialized_view->tryGetTargetTable())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
LOG_TRACE(&Poco::Logger::get("kafka checkDependencies"), "Transitive dependencies");
|
||||||
// Check all its dependencies
|
// Check all its dependencies
|
||||||
if (!checkDependencies(view_id, context))
|
if (!checkDependencies(view_id, context))
|
||||||
return false;
|
return false;
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
|
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Common/checkStackSize.h>
|
#include <Common/checkStackSize.h>
|
||||||
|
#include <Common/randomSeed.h>
|
||||||
#include <Core/ServerSettings.h>
|
#include <Core/ServerSettings.h>
|
||||||
#include <Core/Settings.h>
|
#include <Core/Settings.h>
|
||||||
#include <QueryPipeline/Pipe.h>
|
#include <QueryPipeline/Pipe.h>
|
||||||
@ -40,6 +41,8 @@
|
|||||||
|
|
||||||
#include <Backups/BackupEntriesCollector.h>
|
#include <Backups/BackupEntriesCollector.h>
|
||||||
|
|
||||||
|
#include <Common/logger_useful.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
namespace Setting
|
namespace Setting
|
||||||
@ -51,6 +54,7 @@ namespace Setting
|
|||||||
namespace ServerSetting
|
namespace ServerSetting
|
||||||
{
|
{
|
||||||
extern const ServerSettingsUInt64 max_materialized_views_count_for_table;
|
extern const ServerSettingsUInt64 max_materialized_views_count_for_table;
|
||||||
|
extern const ServerSettingsUInt64 startup_mv_delay_ms;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace RefreshSetting
|
namespace RefreshSetting
|
||||||
@ -745,12 +749,34 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
|
|||||||
refresher->rename(new_table_id, getTargetTableId());
|
refresher->rename(new_table_id, getTargetTableId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void StorageMaterializedView::pushDependencies()
|
||||||
|
{
|
||||||
|
// assert(!dependencies_are_tracked);
|
||||||
|
if (!dependencies_are_tracked)
|
||||||
|
{
|
||||||
|
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||||
|
const auto & select_query = metadata_snapshot->getSelectQuery();
|
||||||
|
if (!select_query.select_table_id.empty())
|
||||||
|
DatabaseCatalog::instance().addViewDependency(select_query.select_table_id, getStorageID());
|
||||||
|
dependencies_are_tracked = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void StorageMaterializedView::startup()
|
void StorageMaterializedView::startup()
|
||||||
{
|
{
|
||||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
if (const auto configured_delay_ms = getContext()->getServerSettings()[ServerSetting::startup_mv_delay_ms]; configured_delay_ms)
|
||||||
const auto & select_query = metadata_snapshot->getSelectQuery();
|
{
|
||||||
if (!select_query.select_table_id.empty())
|
pcg64_fast gen{randomSeed()};
|
||||||
DatabaseCatalog::instance().addViewDependency(select_query.select_table_id, getStorageID());
|
const auto delay_ms = std::uniform_int_distribution<>(0, 1)(gen) ? configured_delay_ms : 0UL;
|
||||||
|
if (delay_ms)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("StorageMaterializedView"), "sleeping in startup of {}", getStorageID().table_name);
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("StorageMaterializedView"), "woken up in startup of {}", getStorageID().table_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pushDependencies();
|
||||||
|
|
||||||
if (refresher)
|
if (refresher)
|
||||||
refresher->startup();
|
refresher->startup();
|
||||||
|
@ -67,6 +67,7 @@ public:
|
|||||||
|
|
||||||
void renameInMemory(const StorageID & new_table_id) override;
|
void renameInMemory(const StorageID & new_table_id) override;
|
||||||
|
|
||||||
|
void pushDependencies() override;
|
||||||
void startup() override;
|
void startup() override;
|
||||||
void shutdown(bool is_drop) override;
|
void shutdown(bool is_drop) override;
|
||||||
|
|
||||||
@ -116,6 +117,8 @@ private:
|
|||||||
/// have UUID, and we do inner table lookup by name instead.
|
/// have UUID, and we do inner table lookup by name instead.
|
||||||
bool fixed_uuid = true;
|
bool fixed_uuid = true;
|
||||||
|
|
||||||
|
bool dependencies_are_tracked = false;
|
||||||
|
|
||||||
friend class RefreshTask;
|
friend class RefreshTask;
|
||||||
|
|
||||||
void checkStatementCanBeForwarded() const;
|
void checkStatementCanBeForwarded() const;
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
<clickhouse>
|
<clickhouse>
|
||||||
<async_load_databases>true</async_load_databases>
|
<async_load_databases>true</async_load_databases>
|
||||||
|
<startup_mv_delay_ms>10000</startup_mv_delay_ms>
|
||||||
</clickhouse>
|
</clickhouse>
|
||||||
|
@ -242,3 +242,80 @@ def test_async_load_system_database(started_cluster):
|
|||||||
for i in range(id - 1):
|
for i in range(id - 1):
|
||||||
node2.query(f"drop table if exists system.text_log_{i + 1}_test")
|
node2.query(f"drop table if exists system.text_log_{i + 1}_test")
|
||||||
node2.query(f"drop table if exists system.query_log_{i + 1}_test")
|
node2.query(f"drop table if exists system.query_log_{i + 1}_test")
|
||||||
|
|
||||||
|
|
||||||
|
def test_materialzed_views(started_cluster):
|
||||||
|
query = node1.query
|
||||||
|
query("create database test_mv")
|
||||||
|
query("create table test_mv.t (Id UInt64) engine=MergeTree order by Id")
|
||||||
|
query("create table test_mv.a (Id UInt64) engine=MergeTree order by Id")
|
||||||
|
query("create table test_mv.z (Id UInt64) engine=MergeTree order by Id")
|
||||||
|
query("create materialized view t_to_a to test_mv.a as select Id from test_mv.t")
|
||||||
|
query("create materialized view t_to_z to test_mv.z as select Id from test_mv.t")
|
||||||
|
|
||||||
|
node1.restart_clickhouse()
|
||||||
|
query("insert into test_mv.t values(42)")
|
||||||
|
assert query("select * from test_mv.a Format CSV") == "42\n"
|
||||||
|
assert query("select * from test_mv.z Format CSV") == "42\n"
|
||||||
|
|
||||||
|
query("drop view t_to_a")
|
||||||
|
query("drop view t_to_z")
|
||||||
|
query("drop table test_mv.t")
|
||||||
|
query("drop table test_mv.a")
|
||||||
|
query("drop table test_mv.z")
|
||||||
|
query("drop database test_mv")
|
||||||
|
|
||||||
|
|
||||||
|
def test_materialzed_views_cascaded(started_cluster):
|
||||||
|
query = node1.query
|
||||||
|
query("create database test_mv")
|
||||||
|
query("create table test_mv.t (Id UInt64) engine=MergeTree order by Id")
|
||||||
|
query("create table test_mv.a (Id UInt64) engine=MergeTree order by Id")
|
||||||
|
query("create table test_mv.z (Id UInt64) engine=MergeTree order by Id")
|
||||||
|
query("create materialized view t_to_a to test_mv.a as select Id from test_mv.t")
|
||||||
|
query("create materialized view a_to_z to test_mv.z as select Id from test_mv.a")
|
||||||
|
|
||||||
|
node1.restart_clickhouse()
|
||||||
|
query("insert into test_mv.t values(42)")
|
||||||
|
assert query("select * from test_mv.a Format CSV") == "42\n"
|
||||||
|
assert query("select * from test_mv.z Format CSV") == "42\n"
|
||||||
|
|
||||||
|
query("drop view t_to_a")
|
||||||
|
query("drop view a_to_z")
|
||||||
|
query("drop table test_mv.t")
|
||||||
|
query("drop table test_mv.a")
|
||||||
|
query("drop table test_mv.z")
|
||||||
|
query("drop database test_mv")
|
||||||
|
|
||||||
|
|
||||||
|
def test_materialzed_views_cascaded_multiple(started_cluster):
|
||||||
|
query = node1.query
|
||||||
|
query("create database test_mv")
|
||||||
|
query("create table test_mv.t (Id UInt64) engine=MergeTree order by Id")
|
||||||
|
query("create table test_mv.a (Id UInt64) engine=MergeTree order by Id")
|
||||||
|
query("create table test_mv.x (IdText String) engine=MergeTree order by IdText")
|
||||||
|
query(
|
||||||
|
"create table test_mv.z (Id UInt64, IdTextLength UInt64) engine=MergeTree order by Id"
|
||||||
|
)
|
||||||
|
query("create materialized view t_to_a to test_mv.a as select Id from test_mv.t")
|
||||||
|
query(
|
||||||
|
"create materialized view t_to_x to test_mv.x as select toString(Id) as IdText from test_mv.t"
|
||||||
|
)
|
||||||
|
query(
|
||||||
|
"create materialized view ax_to_z to test_mv.z as select Id, (select max(length(IdText)) from test_mv.x) as IdTextLength from test_mv.a"
|
||||||
|
)
|
||||||
|
|
||||||
|
node1.restart_clickhouse()
|
||||||
|
query("insert into test_mv.t values(42)")
|
||||||
|
assert query("select * from test_mv.a Format CSV") == "42\n"
|
||||||
|
assert query("select * from test_mv.x Format CSV") == '"42"\n'
|
||||||
|
assert query("select * from test_mv.z Format CSV") == "42,2\n"
|
||||||
|
|
||||||
|
query("drop view t_to_a")
|
||||||
|
query("drop view t_to_x")
|
||||||
|
query("drop view ax_to_z")
|
||||||
|
query("drop table test_mv.t")
|
||||||
|
query("drop table test_mv.a")
|
||||||
|
query("drop table test_mv.x")
|
||||||
|
query("drop table test_mv.z")
|
||||||
|
query("drop database test_mv")
|
||||||
|
Loading…
Reference in New Issue
Block a user