From bc019649733ee01fa291dae9ab9f16a38fd066a0 Mon Sep 17 00:00:00 2001 From: Vxider Date: Fri, 21 Feb 2020 01:30:58 +0800 Subject: [PATCH] optimize write --- .../AddingTimestampBlockInputStream.h | 39 -------- .../Storages/WindowView/StorageWindowView.cpp | 88 +++++++++++-------- .../Storages/WindowView/StorageWindowView.h | 4 +- .../WindowView/WindowViewBlockInputStream.h | 11 +-- 4 files changed, 53 insertions(+), 89 deletions(-) delete mode 100644 dbms/src/Storages/WindowView/AddingTimestampBlockInputStream.h diff --git a/dbms/src/Storages/WindowView/AddingTimestampBlockInputStream.h b/dbms/src/Storages/WindowView/AddingTimestampBlockInputStream.h deleted file mode 100644 index a307fccc3bc..00000000000 --- a/dbms/src/Storages/WindowView/AddingTimestampBlockInputStream.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include -#include - - -namespace DB -{ -/** Add timestamp column for processing time process in - * WINDOW VIEW - */ -class AddingTimestampBlockInputStream : public IBlockInputStream -{ -public: - AddingTimestampBlockInputStream(const BlockInputStreamPtr & input_, UInt32 timestamp_) : input(input_), timestamp(timestamp_) - { - cached_header = input->getHeader(); - cached_header.insert({ColumnUInt32::create(1, 1), std::make_shared(), "____timestamp"}); - } - - String getName() const override { return "AddingTimestamp"; } - - Block getHeader() const override { return cached_header.cloneEmpty(); } - -protected: - Block readImpl() override - { - Block res = input->read(); - if (res) - res.insert({ColumnUInt32::create(res.rows(), timestamp), std::make_shared(), "____timestamp"}); - return res; - } - -private: - BlockInputStreamPtr input; - Block cached_header; - UInt32 timestamp; -}; -} diff --git a/dbms/src/Storages/WindowView/StorageWindowView.cpp b/dbms/src/Storages/WindowView/StorageWindowView.cpp index 9d2e9f4680e..1da8c620468 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.cpp +++ b/dbms/src/Storages/WindowView/StorageWindowView.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -38,7 +39,6 @@ #include #include -#include #include #include #include @@ -350,6 +350,11 @@ inline void StorageWindowView::cleanCache() } mergeable_blocks->remove_if([](BlocksListPtr & ptr) { return ptr->size() == 0; }); } + + { + std::lock_guard lock(fire_signal_mutex); + watch_streams.remove_if([](std::weak_ptr & ptr) { return ptr.expired(); }); + } } inline void StorageWindowView::flushToTable(UInt32 timestamp_) @@ -487,10 +492,10 @@ inline void StorageWindowView::addFireSignal(UInt32 timestamp_) { if (!target_table_id.empty()) fire_signal.push_back(timestamp_); - for (auto watch_stream : watch_streams) + for (auto & watch_stream : watch_streams) { - if (watch_stream) - watch_stream->addFireSignal(timestamp_); + if (auto watch_stream_ = watch_stream.lock()) + watch_stream_->addFireSignal(timestamp_); } condition.notify_all(); } @@ -584,7 +589,7 @@ BlockInputStreams StorageWindowView::watch( { std::lock_guard lock(fire_signal_mutex); - watch_streams.push_back(reader.get()); + watch_streams.push_back(reader); } processed_stage = QueryProcessingStage::Complete; @@ -707,6 +712,27 @@ StorageWindowView::StorageWindowView( fetch_column_query = generateFetchColumnsQuery(inner_table_id); } + { + // generate write expressions + ColumnsWithTypeAndName columns__; + columns__.emplace_back( + nullptr, + std::make_shared(DataTypes{std::make_shared(), std::make_shared()}), + window_column_name); + columns__.emplace_back(nullptr, std::make_shared(), "____timestamp"); + columns__.emplace_back(nullptr, std::make_shared(), "____watermark"); + const auto & function_tuple = FunctionFactory::instance().get("tupleElement", global_context); + writeExpressions = std::make_shared(columns__, global_context); + writeExpressions->add(ExpressionAction::addColumn( + {std::make_shared()->createColumnConst(1, toField(2)), std::make_shared(), "____tuple_arg"})); + writeExpressions->add(ExpressionAction::applyFunction(function_tuple, Names{window_column_name, "____tuple_arg"}, "____w_end")); + writeExpressions->add(ExpressionAction::removeColumn("____tuple_arg")); + + const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", global_context); + writeExpressions->add(ExpressionAction::applyFunction(function_greater, Names{"____w_end", "____watermark"}, "____filter")); + writeExpressions->add(ExpressionAction::removeColumn("____watermark")); + } + toTableTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncToTable(); }); cleanCacheTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanCache(); }); fireTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFire(); }); @@ -748,41 +774,27 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query) void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context) { UInt32 timestamp_now = std::time(nullptr); + UInt32 watermark; auto block_stream = std::make_shared(block); - BlockInputStreams streams; + BlockInputStreamPtr source_stream; if (window_view.is_proctime_tumble) - streams = {std::make_shared(block_stream, timestamp_now)}; + { + source_stream = std::make_shared>(block_stream, std::make_shared(), timestamp_now, "____timestamp"); + watermark = window_view.getWindowLowerBound(timestamp_now); + } else - streams = {block_stream}; + { + source_stream = block_stream; + watermark = window_view.getWatermark(timestamp_now); + } - auto window_proxy_storage = std::make_shared( - StorageID("", "WindowViewProxyStorage"), window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns); InterpreterSelectQuery select_block( - window_view.getFinalQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState); - auto data_mergeable_stream = std::make_shared(select_block.execute().in); - - // extract ____w_end - ColumnsWithTypeAndName columns_; - columns_.emplace_back( - nullptr, - std::make_shared(DataTypes{std::make_shared(), std::make_shared()}), - window_view.window_column_name); - const auto & function_tuple = FunctionFactory::instance().get("tupleElement", context); - ExpressionActionsPtr actions_ = std::make_shared(columns_, context); - actions_->add(ExpressionAction::addColumn( - {std::make_shared()->createColumnConst(1, toField(2)), std::make_shared(), "____tuple_arg"})); - actions_->add(ExpressionAction::applyFunction(function_tuple, Names{window_view.window_column_name, "____tuple_arg"}, "____w_end")); - actions_->add(ExpressionAction::removeColumn("____tuple_arg")); + window_view.getFinalQuery(), context, source_stream, QueryProcessingStage::WithMergeableState); + source_stream = std::make_shared(select_block.execute().in); + source_stream = std::make_shared>(source_stream, std::make_shared(), watermark, "____watermark"); BlockInputStreamPtr in_stream; - UInt32 watermark = window_view.getWatermark(timestamp_now); - actions_->add(ExpressionAction::addColumn({std::make_shared()->createColumnConst(1, toField(watermark)), - std::make_shared(), - "____watermark"})); - const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context); - actions_->add(ExpressionAction::applyFunction(function_greater, Names{"____w_end", "____watermark"}, "____filter")); - actions_->add(ExpressionAction::removeColumn("____watermark")); - in_stream = std::make_shared(data_mergeable_stream, actions_, "____filter", true); + in_stream = std::make_shared(source_stream, window_view.writeExpressions, "____filter", true); if (!window_view.inner_table_id.empty()) { @@ -841,11 +853,9 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con while (Block block_ = in_stream->read()) new_mergeable_blocks->push_back(std::move(block_)); } - if (!new_mergeable_blocks->empty()) - { - std::unique_lock lock(window_view.mutex); - window_view.getMergeableBlocksList()->push_back(new_mergeable_blocks); - } + + std::unique_lock lock(window_view.mutex); + window_view.getMergeableBlocksList()->push_back(new_mergeable_blocks); } } @@ -854,7 +864,7 @@ void StorageWindowView::startup() // Start the working thread if (!target_table_id.empty()) toTableTask->activateAndSchedule(); - // cleanCacheTask->activateAndSchedule(); + cleanCacheTask->activateAndSchedule(); fireTask->activateAndSchedule(); } diff --git a/dbms/src/Storages/WindowView/StorageWindowView.h b/dbms/src/Storages/WindowView/StorageWindowView.h index cfb9ad129ca..f1680811c12 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.h +++ b/dbms/src/Storages/WindowView/StorageWindowView.h @@ -66,7 +66,7 @@ private: UInt64 clean_interval; const DateLUTImpl & time_zone; std::list fire_signal; - std::list watch_streams; + std::list> watch_streams; std::condition_variable condition; BlocksListPtrs mergeable_blocks; @@ -96,6 +96,8 @@ private: BackgroundSchedulePool::TaskHolder cleanCacheTask; BackgroundSchedulePool::TaskHolder fireTask; + ExpressionActionsPtr writeExpressions; + ASTPtr innerQueryParser(ASTSelectQuery & inner_query); std::shared_ptr generateInnerTableCreateQuery(const ASTCreateQuery & inner_create_query, const String & database_name, const String & table_name); diff --git a/dbms/src/Storages/WindowView/WindowViewBlockInputStream.h b/dbms/src/Storages/WindowView/WindowViewBlockInputStream.h index d2bd151868a..250dc5ef057 100644 --- a/dbms/src/Storages/WindowView/WindowViewBlockInputStream.h +++ b/dbms/src/Storages/WindowView/WindowViewBlockInputStream.h @@ -30,15 +30,6 @@ public: if (isCancelled() || storage->is_dropped) return; IBlockInputStream::cancel(kill); - std::lock_guard lock(storage->fire_signal_mutex); - for (auto it = storage->watch_streams.begin() ; it != storage->watch_streams.end() ; ++it) - { - if (*it == this) - { - storage->watch_streams.erase(it); - break; - } - } } Block getHeader() const override { return storage->getHeader(); } @@ -125,6 +116,6 @@ private: bool end_of_blocks = false; BlockInputStreamPtr in_stream; std::mutex fire_signal_mutex; - std::list fire_signal; + std::deque fire_signal; }; }