From 35600d6a9983ec3a4c72ec9f46668d4f05a10df9 Mon Sep 17 00:00:00 2001 From: Vxider Date: Mon, 17 Feb 2020 16:18:27 +0800 Subject: [PATCH] trauncate and optimize support --- dbms/src/Core/Defines.h | 2 +- dbms/src/Core/Settings.h | 2 +- .../Storages/WindowView/StorageWindowView.cpp | 43 +++++++++++++------ .../Storages/WindowView/StorageWindowView.h | 14 +++--- .../01052_window_view_event_time_watch.sql | 10 ++--- .../01055_window_view_proc_time_to.sql | 10 ++--- 6 files changed, 50 insertions(+), 31 deletions(-) diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index c84ab12a6b4..fca458c0e90 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -34,7 +34,7 @@ #define DEFAULT_MERGE_BLOCK_SIZE 8192 #define DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC 5 -#define DEFAULT_WINDOW_VIEW_INNER_TABLE_CLEAN_INTERVAL_SEC 5 +#define DEFAULT_WINDOW_VIEW_CLEAN_INTERVAL_SEC 5 #define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160) #define DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC 15 #define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024 diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index e27a9442a3e..0c0d5dd961e 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -387,7 +387,7 @@ struct Settings : public SettingsCollection M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.", 0) \ M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.", 0) \ M(SettingBool, allow_experimental_window_view, false, "Enable WINDOW VIEW. Not mature enough.", 0) \ - M(SettingSeconds, window_view_inner_table_clean_interval, DEFAULT_WINDOW_VIEW_INNER_TABLE_CLEAN_INTERVAL_SEC, "The clean interval of window view inner table in seconds to free outdated data.", 0) \ + M(SettingSeconds, window_view_clean_interval, DEFAULT_WINDOW_VIEW_CLEAN_INTERVAL_SEC, "The clean interval of window view in seconds to free outdated data.", 0) \ M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \ \ M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \ diff --git a/dbms/src/Storages/WindowView/StorageWindowView.cpp b/dbms/src/Storages/WindowView/StorageWindowView.cpp index 0505bf536c8..8b5c5b8deae 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.cpp +++ b/dbms/src/Storages/WindowView/StorageWindowView.cpp @@ -276,7 +276,6 @@ static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context, { if (global_context.tryGetTable(target_table_id)) { - /// We create and execute `drop` query for internal table. auto drop_query = std::make_shared(); drop_query->database = target_table_id.database_name; drop_query->table = target_table_id.table_name; @@ -300,7 +299,26 @@ void StorageWindowView::drop(TableStructureWriteLockHolder &) condition.notify_all(); } -inline void StorageWindowView::clearInnerTable() +void StorageWindowView::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) +{ + if (!inner_table_id.empty()) + executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, inner_table_id); + else + { + std::lock_guard lock(mutex); + mergeable_blocks = std::make_shared>(); + } +} + +bool StorageWindowView::optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) +{ + if (inner_table_id.empty()) + throw Exception( + "OPTIMIZE only supported when creating WINDOW VIEW within INNER table.", ErrorCodes::INCORRECT_QUERY); + return getInnerStorage()->optimize(query, partition, final, deduplicate, context); +} + +inline void StorageWindowView::cleanCache() { //delete fired blocks UInt32 timestamp_now = std::time(nullptr); @@ -477,14 +495,14 @@ inline void StorageWindowView::addFireSignal(UInt32 timestamp_) condition.notify_all(); } -void StorageWindowView::threadFuncClearInnerTable() +void StorageWindowView::threadFuncCleanCache() { while (!shutdown_called) { try { - clearInnerTable(); - sleep(inner_table_clear_interval); + cleanCache(); + sleep(clean_interval); } catch (...) { @@ -493,7 +511,7 @@ void StorageWindowView::threadFuncClearInnerTable() } } if (!shutdown_called) - innerTableClearTask->scheduleAfter(RESCHEDULE_MS); + cleanCacheTask->scheduleAfter(RESCHEDULE_MS); } void StorageWindowView::threadFuncToTable() @@ -635,10 +653,8 @@ StorageWindowView::StorageWindowView( if (!query.to_table.empty()) target_table_id = StorageID(query.to_database, query.to_table); - inner_table_clear_interval = local_context.getSettingsRef().window_view_inner_table_clean_interval.totalSeconds(); - + clean_interval = local_context.getSettingsRef().window_view_clean_interval.totalSeconds(); mergeable_blocks = std::make_shared>(); - active_ptr = std::make_shared(true); if (query.watermark_function) @@ -692,10 +708,10 @@ StorageWindowView::StorageWindowView( } toTableTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncToTable(); }); - innerTableClearTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncClearInnerTable(); }); + cleanCacheTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanCache(); }); fireTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFire(); }); toTableTask->deactivate(); - innerTableClearTask->deactivate(); + cleanCacheTask->deactivate(); fireTask->deactivate(); } @@ -743,7 +759,6 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con StorageID("", "WindowViewProxyStorage"), window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns); InterpreterSelectQuery select_block( window_view.getFinalQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState); - auto data_mergeable_stream = std::make_shared(select_block.execute().in); // extract ____w_end @@ -837,7 +852,7 @@ void StorageWindowView::startup() // Start the working thread if (!target_table_id.empty()) toTableTask->activateAndSchedule(); - innerTableClearTask->activateAndSchedule(); + // cleanCacheTask->activateAndSchedule(); fireTask->activateAndSchedule(); } @@ -847,7 +862,7 @@ void StorageWindowView::shutdown() if (!shutdown_called.compare_exchange_strong(expected, true)) return; toTableTask->deactivate(); - innerTableClearTask->deactivate(); + cleanCacheTask->deactivate(); fireTask->deactivate(); } diff --git a/dbms/src/Storages/WindowView/StorageWindowView.h b/dbms/src/Storages/WindowView/StorageWindowView.h index 854e666710c..cfb9ad129ca 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.h +++ b/dbms/src/Storages/WindowView/StorageWindowView.h @@ -32,6 +32,10 @@ public: void drop(TableStructureWriteLockHolder &) override; + void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override; + + bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override; + void startup() override; void shutdown() override; @@ -54,12 +58,12 @@ private: ASTPtr inner_query; ASTPtr final_query; ASTPtr fetch_column_query; - + Context & global_context; bool is_proctime_tumble{false}; std::atomic shutdown_called{false}; mutable Block sample_block; - UInt64 inner_table_clear_interval; + UInt64 clean_interval; const DateLUTImpl & time_zone; std::list fire_signal; std::list watch_streams; @@ -89,7 +93,7 @@ private: StoragePtr target_storage; BackgroundSchedulePool::TaskHolder toTableTask; - BackgroundSchedulePool::TaskHolder innerTableClearTask; + BackgroundSchedulePool::TaskHolder cleanCacheTask; BackgroundSchedulePool::TaskHolder fireTask; ASTPtr innerQueryParser(ASTSelectQuery & inner_query); @@ -102,9 +106,9 @@ private: Block getHeader() const; void flushToTable(UInt32 timestamp_); - void clearInnerTable(); + void cleanCache(); void threadFuncToTable(); - void threadFuncClearInnerTable(); + void threadFuncCleanCache(); void threadFuncFire(); void addFireSignal(UInt32 timestamp_); diff --git a/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.sql b/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.sql index ca29f1a36e6..58482549657 100644 --- a/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.sql +++ b/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.sql @@ -1,35 +1,35 @@ SET allow_experimental_window_view = 1; DROP TABLE IF EXISTS test.mt; +DROP TABLE IF EXISTS test.wv; SELECT '--TUMBLE--'; -DROP TABLE IF EXISTS test.wv; CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); CREATE WINDOW VIEW test.wv WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND); WATCH test.wv LIMIT 1; +DROP TABLE test.wv; SELECT '--HOP--'; -DROP TABLE IF EXISTS test.wv; CREATE WINDOW VIEW test.wv WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND); WATCH test.wv LIMIT 2; +DROP TABLE test.wv; SELECT '--INNER_TUMBLE--'; -DROP TABLE IF EXISTS test.wv; CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND); WATCH test.wv LIMIT 1; +DROP TABLE test.wv; SELECT '--INNER_HOP--'; -DROP TABLE IF EXISTS test.wv; CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND); WATCH test.wv LIMIT 2; - DROP TABLE test.wv; + DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.sql b/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.sql index 38373e9af1c..b660fcd4a5c 100644 --- a/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.sql +++ b/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.sql @@ -3,45 +3,45 @@ SET allow_experimental_window_view = 1; DROP TABLE IF EXISTS test.mt; DROP TABLE IF EXISTS test.dst; DROP TABLE IF EXISTS test.`.inner.wv`; +DROP TABLE IF EXISTS test.wv; CREATE TABLE test.dst(count UInt64) Engine=MergeTree ORDER BY tuple(); CREATE TABLE test.mt(a Int32) ENGINE=MergeTree ORDER BY tuple(); SELECT '--TUMBLE--'; -DROP TABLE IF EXISTS test.wv; TRUNCATE TABLE test.dst; CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid; INSERT INTO test.mt VALUES (1); SELECT sleep(1); SELECT count FROM test.dst; +DROP TABLE test.wv; SELECT '--HOP--'; -DROP TABLE IF EXISTS test.wv; TRUNCATE TABLE test.dst; CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; INSERT INTO test.mt VALUES (1); SELECT sleep(2); SELECT count FROM test.dst; +DROP TABLE test.wv; SELECT '--INNER_TUMBLE--'; -DROP TABLE IF EXISTS test.wv; TRUNCATE TABLE test.dst; CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid; INSERT INTO test.mt VALUES (1); SELECT sleep(1); SELECT count FROM test.dst; +DROP TABLE test.wv; SELECT '--INNER_HOP--'; -DROP TABLE IF EXISTS test.wv; TRUNCATE TABLE test.dst; CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; INSERT INTO test.mt VALUES (1); SELECT sleep(2); SELECT count FROM test.dst; - DROP TABLE test.wv; + DROP TABLE test.mt;