optimize write

This commit is contained in:
Vxider 2020-02-21 01:30:58 +08:00
parent 328150797d
commit bc01964973
4 changed files with 53 additions and 89 deletions

View File

@ -1,39 +0,0 @@
#pragma once
#include <Columns/ColumnsNumber.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** Add timestamp column for processing time process in
* WINDOW VIEW
*/
class AddingTimestampBlockInputStream : public IBlockInputStream
{
public:
AddingTimestampBlockInputStream(const BlockInputStreamPtr & input_, UInt32 timestamp_) : input(input_), timestamp(timestamp_)
{
cached_header = input->getHeader();
cached_header.insert({ColumnUInt32::create(1, 1), std::make_shared<DataTypeDateTime>(), "____timestamp"});
}
String getName() const override { return "AddingTimestamp"; }
Block getHeader() const override { return cached_header.cloneEmpty(); }
protected:
Block readImpl() override
{
Block res = input->read();
if (res)
res.insert({ColumnUInt32::create(res.rows(), timestamp), std::make_shared<DataTypeDateTime>(), "____timestamp"});
return res;
}
private:
BlockInputStreamPtr input;
Block cached_header;
UInt32 timestamp;
};
}

View File

@ -1,6 +1,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/AddingConstColumnBlockInputStream.h>
#include <DataStreams/BlocksBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
@ -38,7 +39,6 @@
#include <boost/lexical_cast.hpp>
#include <Common/typeid_cast.h>
#include <Storages/WindowView/AddingTimestampBlockInputStream.h>
#include <Storages/WindowView/BlocksListInputStream.h>
#include <Storages/WindowView/StorageWindowView.h>
#include <Storages/WindowView/WindowViewBlockInputStream.h>
@ -350,6 +350,11 @@ inline void StorageWindowView::cleanCache()
}
mergeable_blocks->remove_if([](BlocksListPtr & ptr) { return ptr->size() == 0; });
}
{
std::lock_guard lock(fire_signal_mutex);
watch_streams.remove_if([](std::weak_ptr<WindowViewBlockInputStream> & ptr) { return ptr.expired(); });
}
}
inline void StorageWindowView::flushToTable(UInt32 timestamp_)
@ -487,10 +492,10 @@ inline void StorageWindowView::addFireSignal(UInt32 timestamp_)
{
if (!target_table_id.empty())
fire_signal.push_back(timestamp_);
for (auto watch_stream : watch_streams)
for (auto & watch_stream : watch_streams)
{
if (watch_stream)
watch_stream->addFireSignal(timestamp_);
if (auto watch_stream_ = watch_stream.lock())
watch_stream_->addFireSignal(timestamp_);
}
condition.notify_all();
}
@ -584,7 +589,7 @@ BlockInputStreams StorageWindowView::watch(
{
std::lock_guard lock(fire_signal_mutex);
watch_streams.push_back(reader.get());
watch_streams.push_back(reader);
}
processed_stage = QueryProcessingStage::Complete;
@ -707,6 +712,27 @@ StorageWindowView::StorageWindowView(
fetch_column_query = generateFetchColumnsQuery(inner_table_id);
}
{
// generate write expressions
ColumnsWithTypeAndName columns__;
columns__.emplace_back(
nullptr,
std::make_shared<DataTypeTuple>(DataTypes{std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeDateTime>()}),
window_column_name);
columns__.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), "____timestamp");
columns__.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), "____watermark");
const auto & function_tuple = FunctionFactory::instance().get("tupleElement", global_context);
writeExpressions = std::make_shared<ExpressionActions>(columns__, global_context);
writeExpressions->add(ExpressionAction::addColumn(
{std::make_shared<DataTypeUInt8>()->createColumnConst(1, toField(2)), std::make_shared<DataTypeUInt8>(), "____tuple_arg"}));
writeExpressions->add(ExpressionAction::applyFunction(function_tuple, Names{window_column_name, "____tuple_arg"}, "____w_end"));
writeExpressions->add(ExpressionAction::removeColumn("____tuple_arg"));
const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", global_context);
writeExpressions->add(ExpressionAction::applyFunction(function_greater, Names{"____w_end", "____watermark"}, "____filter"));
writeExpressions->add(ExpressionAction::removeColumn("____watermark"));
}
toTableTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncToTable(); });
cleanCacheTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanCache(); });
fireTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFire(); });
@ -748,41 +774,27 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query)
void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context)
{
UInt32 timestamp_now = std::time(nullptr);
UInt32 watermark;
auto block_stream = std::make_shared<OneBlockInputStream>(block);
BlockInputStreams streams;
BlockInputStreamPtr source_stream;
if (window_view.is_proctime_tumble)
streams = {std::make_shared<AddingTimestampBlockInputStream>(block_stream, timestamp_now)};
{
source_stream = std::make_shared<AddingConstColumnBlockInputStream<UInt32>>(block_stream, std::make_shared<DataTypeDateTime>(), timestamp_now, "____timestamp");
watermark = window_view.getWindowLowerBound(timestamp_now);
}
else
streams = {block_stream};
{
source_stream = block_stream;
watermark = window_view.getWatermark(timestamp_now);
}
auto window_proxy_storage = std::make_shared<WindowViewProxyStorage>(
StorageID("", "WindowViewProxyStorage"), window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(
window_view.getFinalQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(select_block.execute().in);
// extract ____w_end
ColumnsWithTypeAndName columns_;
columns_.emplace_back(
nullptr,
std::make_shared<DataTypeTuple>(DataTypes{std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeDateTime>()}),
window_view.window_column_name);
const auto & function_tuple = FunctionFactory::instance().get("tupleElement", context);
ExpressionActionsPtr actions_ = std::make_shared<ExpressionActions>(columns_, context);
actions_->add(ExpressionAction::addColumn(
{std::make_shared<DataTypeUInt8>()->createColumnConst(1, toField(2)), std::make_shared<DataTypeUInt8>(), "____tuple_arg"}));
actions_->add(ExpressionAction::applyFunction(function_tuple, Names{window_view.window_column_name, "____tuple_arg"}, "____w_end"));
actions_->add(ExpressionAction::removeColumn("____tuple_arg"));
window_view.getFinalQuery(), context, source_stream, QueryProcessingStage::WithMergeableState);
source_stream = std::make_shared<MaterializingBlockInputStream>(select_block.execute().in);
source_stream = std::make_shared<AddingConstColumnBlockInputStream<UInt32>>(source_stream, std::make_shared<DataTypeDateTime>(), watermark, "____watermark");
BlockInputStreamPtr in_stream;
UInt32 watermark = window_view.getWatermark(timestamp_now);
actions_->add(ExpressionAction::addColumn({std::make_shared<DataTypeUInt32>()->createColumnConst(1, toField(watermark)),
std::make_shared<DataTypeDateTime>(),
"____watermark"}));
const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context);
actions_->add(ExpressionAction::applyFunction(function_greater, Names{"____w_end", "____watermark"}, "____filter"));
actions_->add(ExpressionAction::removeColumn("____watermark"));
in_stream = std::make_shared<FilterBlockInputStream>(data_mergeable_stream, actions_, "____filter", true);
in_stream = std::make_shared<FilterBlockInputStream>(source_stream, window_view.writeExpressions, "____filter", true);
if (!window_view.inner_table_id.empty())
{
@ -841,12 +853,10 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
while (Block block_ = in_stream->read())
new_mergeable_blocks->push_back(std::move(block_));
}
if (!new_mergeable_blocks->empty())
{
std::unique_lock lock(window_view.mutex);
window_view.getMergeableBlocksList()->push_back(new_mergeable_blocks);
}
}
}
void StorageWindowView::startup()
@ -854,7 +864,7 @@ void StorageWindowView::startup()
// Start the working thread
if (!target_table_id.empty())
toTableTask->activateAndSchedule();
// cleanCacheTask->activateAndSchedule();
cleanCacheTask->activateAndSchedule();
fireTask->activateAndSchedule();
}

View File

@ -66,7 +66,7 @@ private:
UInt64 clean_interval;
const DateLUTImpl & time_zone;
std::list<UInt32> fire_signal;
std::list<WindowViewBlockInputStream *> watch_streams;
std::list<std::weak_ptr<WindowViewBlockInputStream>> watch_streams;
std::condition_variable condition;
BlocksListPtrs mergeable_blocks;
@ -96,6 +96,8 @@ private:
BackgroundSchedulePool::TaskHolder cleanCacheTask;
BackgroundSchedulePool::TaskHolder fireTask;
ExpressionActionsPtr writeExpressions;
ASTPtr innerQueryParser(ASTSelectQuery & inner_query);
std::shared_ptr<ASTCreateQuery> generateInnerTableCreateQuery(const ASTCreateQuery & inner_create_query, const String & database_name, const String & table_name);

View File

@ -30,15 +30,6 @@ public:
if (isCancelled() || storage->is_dropped)
return;
IBlockInputStream::cancel(kill);
std::lock_guard lock(storage->fire_signal_mutex);
for (auto it = storage->watch_streams.begin() ; it != storage->watch_streams.end() ; ++it)
{
if (*it == this)
{
storage->watch_streams.erase(it);
break;
}
}
}
Block getHeader() const override { return storage->getHeader(); }
@ -125,6 +116,6 @@ private:
bool end_of_blocks = false;
BlockInputStreamPtr in_stream;
std::mutex fire_signal_mutex;
std::list<UInt32> fire_signal;
std::deque<UInt32> fire_signal;
};
}