fix style
This commit is contained in:
feng lv 2021-09-28 05:20:02 +00:00
parent ef5c71a9ab
commit 1f90c8dc22
2 changed files with 13 additions and 24 deletions

View File

@ -43,7 +43,7 @@ void FileLogDirectoryWatcher::onItemAdded(const Poco::DirectoryWatcher::Director
de.callback = "onItemAdded";
de.path = ev.item.path();
de.type = ev.event;
events.emplace_back(de);
events.push_back(de);
}
@ -54,7 +54,7 @@ void FileLogDirectoryWatcher::onItemRemoved(const Poco::DirectoryWatcher::Direct
de.callback = "onItemRemoved";
de.path = ev.item.path();
de.type = ev.event;
events.emplace_back(de);
events.push_back(de);
}
@ -65,7 +65,7 @@ void FileLogDirectoryWatcher::onItemModified(const Poco::DirectoryWatcher::Direc
de.callback = "onItemModified";
de.path = ev.item.path();
de.type = ev.event;
events.emplace_back(de);
events.push_back(de);
}
void FileLogDirectoryWatcher::onItemMovedFrom(const Poco::DirectoryWatcher::DirectoryEvent& ev)
@ -75,7 +75,7 @@ void FileLogDirectoryWatcher::onItemMovedFrom(const Poco::DirectoryWatcher::Dire
de.callback = "onItemMovedFrom";
de.path = ev.item.path();
de.type = ev.event;
events.emplace_back(de);
events.push_back(de);
}
void FileLogDirectoryWatcher::onItemMovedTo(const Poco::DirectoryWatcher::DirectoryEvent& ev)
@ -85,7 +85,7 @@ void FileLogDirectoryWatcher::onItemMovedTo(const Poco::DirectoryWatcher::Direct
de.callback = "onItemMovedTo";
de.path = ev.item.path();
de.type = ev.event;
events.emplace_back(de);
events.push_back(de);
}
void FileLogDirectoryWatcher::onError(const Poco::Exception & e)

View File

@ -11,7 +11,7 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Pipe.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Storages/FileLog/FileLogSource.h>
@ -528,7 +528,7 @@ bool StorageFileLog::streamToViews()
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
block_io.out->getHeader().getColumnsWithTypeAndName(),
block_io.pipeline.getHeader().getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto actions = std::make_shared<ExpressionActions>(
@ -538,28 +538,17 @@ bool StorageFileLog::streamToViews()
pipes.emplace_back(std::move(pipe));
}
QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
auto input= Pipe::unitePipes(std::move(pipes));
assertBlocksHaveEqualStructure(pipeline.getHeader(), block_io.out->getHeader(), "StorageFileLog streamToViews");
assertBlocksHaveEqualStructure(input.getHeader(), block_io.pipeline.getHeader(), "StorageFileLog streamToViews");
size_t rows = 0;
block_io.pipeline.complete(std::move(input));
PullingPipelineExecutor executor(pipeline);
Block block;
block_io.out->writePrefix();
while (executor.pull(block))
{
block_io.out->write(block);
rows += block.rows();
/// During files open, also save file end at the opening moment
serialize(true);
}
block_io.out->writeSuffix();
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.",
formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds);
LOG_DEBUG(log, "Pushing data to {} took {} ms.", table_id.getNameForLogs(), milliseconds);
return updateFileInfos();
}