Merge pull request #27577 from ClickHouse/remove-streams-from-lv

Remove streams from lv
This commit is contained in:
alexey-milovidov 2021-08-13 03:01:33 +03:00 committed by GitHub
commit f20eae9a45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 169 additions and 82 deletions

View File

@ -261,7 +261,6 @@ BlockIO InterpreterInsertQuery::execute()
{
InterpreterWatchQuery interpreter_watch{ query.watch, getContext() };
res = interpreter_watch.execute();
res.pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(std::move(res.in))));
}
for (size_t i = 0; i < out_streams_size; i++)

View File

@ -71,10 +71,9 @@ BlockIO InterpreterWatchQuery::execute()
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// Watch storage
streams = storage->watch(required_columns, query_info, getContext(), from_stage, max_block_size, max_streams);
auto pipe = storage->watch(required_columns, query_info, getContext(), from_stage, max_block_size, max_streams);
/// Constraints on the result, the quota on the result, and also callback for progress.
if (IBlockInputStream * stream = dynamic_cast<IBlockInputStream *>(streams[0].get()))
{
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048
@ -82,11 +81,11 @@ BlockIO InterpreterWatchQuery::execute()
limits.size_limits.max_bytes = settings.max_result_bytes;
limits.size_limits.overflow_mode = settings.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(getContext()->getQuota());
pipe.setLimits(limits);
pipe.setQuota(getContext()->getQuota());
}
res.in = streams[0];
res.pipeline.init(std::move(pipe));
return res;
}

View File

@ -0,0 +1,27 @@
#include <Processors/Transforms/SquashingChunksTransform.h>
namespace DB
{
SquashingChunksTransform::SquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory)
: IAccumulatingTransform(header, header)
, squashing(min_block_size_rows, min_block_size_bytes, reserve_memory)
{
}
void SquashingChunksTransform::consume(Chunk chunk)
{
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())))
{
setReadyChunk(Chunk(block.getColumns(), block.rows()));
}
}
Chunk SquashingChunksTransform::generate()
{
auto block = squashing.add({});
return Chunk(block.getColumns(), block.rows());
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Processors/IAccumulatingTransform.h>
#include <DataStreams/SquashingTransform.h>
namespace DB
{
class SquashingChunksTransform : public IAccumulatingTransform
{
public:
explicit SquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory = false);
String getName() const override { return "SquashingTransform"; }
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
SquashingTransform squashing;
};
}

View File

@ -165,6 +165,7 @@ SRCS(
Transforms/ReverseTransform.cpp
Transforms/RollupTransform.cpp
Transforms/SortingTransform.cpp
Transforms/SquashingChunksTransform.cpp
Transforms/TotalsHavingTransform.cpp
Transforms/WindowTransform.cpp
Transforms/getSourceFromFromASTInsertQuery.cpp

View File

@ -264,7 +264,7 @@ public:
*
* It is guaranteed that the structure of the table will not change over the lifetime of the returned streams (that is, there will not be ALTER, RENAME and DROP).
*/
virtual BlockInputStreams watch(
virtual Pipe watch(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,

View File

@ -16,7 +16,7 @@ limitations under the License. */
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Storages/LiveView/StorageLiveView.h>
@ -27,7 +27,7 @@ namespace DB
* Keeps stream alive by outputting blocks with no rows
* based on period specified by the heartbeat interval.
*/
class LiveViewEventsBlockInputStream : public IBlockInputStream
class LiveViewEventsSource : public SourceWithProgress
{
using NonBlockingResult = std::pair<Block, bool>;
@ -35,13 +35,14 @@ using NonBlockingResult = std::pair<Block, bool>;
public:
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
/// and LIMIT 0 just returns data without waiting for any updates
LiveViewEventsBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
LiveViewEventsSource(std::shared_ptr<StorageLiveView> storage_,
std::shared_ptr<BlocksPtr> blocks_ptr_,
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
std::shared_ptr<bool> active_ptr_,
const bool has_limit_, const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
: SourceWithProgress({ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "version")}),
storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
limit(limit_),
@ -51,22 +52,17 @@ public:
active = active_ptr.lock();
}
String getName() const override { return "LiveViewEventsBlockInputStream"; }
String getName() const override { return "LiveViewEventsSource"; }
void cancel(bool kill) override
void onCancel() override
{
if (isCancelled() || storage->shutdown_called)
return;
IBlockInputStream::cancel(kill);
std::lock_guard lock(storage->mutex);
storage->condition.notify_all();
}
Block getHeader() const override
{
return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "version")};
}
void refresh()
{
if (active && blocks && it == end)
@ -109,10 +105,11 @@ public:
return res;
}
protected:
Block readImpl() override
Chunk generate() override
{
/// try reading
return tryReadImpl(true).first;
auto block = tryReadImpl(true).first;
return Chunk(block.getColumns(), block.rows());
}
/** tryRead method attempts to read a block in either blocking
@ -170,7 +167,7 @@ protected:
if (!end_of_blocks)
{
end_of_blocks = true;
return { getHeader(), true };
return { getPort().getHeader(), true };
}
while (true)
{
@ -192,7 +189,7 @@ protected:
{
// repeat the event block as a heartbeat
last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
return { getHeader(), true };
return { getPort().getHeader(), true };
}
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Common/hex.h>
@ -9,19 +9,28 @@
namespace DB
{
class LiveViewBlockOutputStream : public IBlockOutputStream
class LiveViewSink : public SinkToStorage
{
public:
explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {}
/// _version column is added manually in sink.
static Block updateHeader(Block block)
{
block.erase("_version");
return block;
}
void writePrefix() override
public:
explicit LiveViewSink(StorageLiveView & storage_) : SinkToStorage(updateHeader(storage_.getHeader())), storage(storage_) {}
String getName() const override { return "LiveViewSink"; }
void onStart() override
{
new_blocks = std::make_shared<Blocks>();
new_blocks_metadata = std::make_shared<BlocksMetadata>();
new_hash = std::make_shared<SipHash>();
}
void writeSuffix() override
void onFinish() override
{
UInt128 key;
String key_str;
@ -65,14 +74,13 @@ public:
new_hash.reset();
}
void write(const Block & block) override
void consume(Chunk chunk) override
{
new_blocks->push_back(block);
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
block.updateHash(*new_hash);
new_blocks->push_back(std::move(block));
}
Block getHeader() const override { return storage.getHeader(); }
private:
using SipHashPtr = std::shared_ptr<SipHash>;

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
@ -10,19 +11,20 @@ namespace DB
* Keeps stream alive by outputting blocks with no rows
* based on period specified by the heartbeat interval.
*/
class LiveViewBlockInputStream : public IBlockInputStream
class LiveViewSource : public SourceWithProgress
{
using NonBlockingResult = std::pair<Block, bool>;
public:
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
LiveViewSource(std::shared_ptr<StorageLiveView> storage_,
std::shared_ptr<BlocksPtr> blocks_ptr_,
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
std::shared_ptr<bool> active_ptr_,
const bool has_limit_, const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
: SourceWithProgress(storage_->getHeader())
, storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
active_ptr(std::move(active_ptr_)),
has_limit(has_limit_), limit(limit_),
@ -34,17 +36,15 @@ public:
String getName() const override { return "LiveViewBlockInputStream"; }
void cancel(bool kill) override
void onCancel() override
{
if (isCancelled() || storage->shutdown_called)
return;
IBlockInputStream::cancel(kill);
std::lock_guard lock(storage->mutex);
storage->condition.notify_all();
}
Block getHeader() const override { return storage->getHeader(); }
void refresh()
{
if (active && blocks && it == end)
@ -74,10 +74,11 @@ public:
}
protected:
Block readImpl() override
Chunk generate() override
{
/// try reading
return tryReadImpl(true).first;
auto block = tryReadImpl(true).first;
return Chunk(block.getColumns(), block.rows());
}
/** tryRead method attempts to read a block in either blocking
@ -135,7 +136,7 @@ protected:
if (!end_of_blocks)
{
end_of_blocks = true;
return { getHeader(), true };
return { getPort().getHeader(), true };
}
while (true)
{
@ -157,7 +158,7 @@ protected:
{
// heartbeat
last_event_timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds());
return { getHeader(), true };
return { getPort().getHeader(), true };
}
}
}

View File

@ -15,10 +15,11 @@ limitations under the License. */
#include <Parsers/ASTLiteral.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sources/BlocksSource.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <DataStreams/copyData.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>
@ -26,9 +27,9 @@ limitations under the License. */
#include <Common/hex.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/LiveView/LiveViewBlockInputStream.h>
#include <Storages/LiveView/LiveViewBlockOutputStream.h>
#include <Storages/LiveView/LiveViewEventsBlockInputStream.h>
#include <Storages/LiveView/LiveViewSource.h>
#include <Storages/LiveView/LiveViewSink.h>
#include <Storages/LiveView/LiveViewEventsSource.h>
#include <Storages/LiveView/StorageBlocks.h>
#include <Storages/LiveView/TemporaryLiveViewCleaner.h>
@ -110,15 +111,23 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
InterpreterSelectQuery interpreter(mergeable_query->clone(), local_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().getInputStream());
auto io = interpreter.execute();
io.pipeline.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<MaterializingTransform>(cur_header);
});
while (Block this_block = view_mergeable_stream->read())
new_mergeable_blocks->sample_block = io.pipeline.getHeader();
PullingPipelineExecutor executor(io.pipeline);
Block this_block;
while (executor.pull(this_block))
base_blocks->push_back(this_block);
new_blocks->push_back(base_blocks);
new_mergeable_blocks->blocks = new_blocks;
new_mergeable_blocks->sample_block = view_mergeable_stream->getHeader();
return new_mergeable_blocks;
}
@ -133,7 +142,7 @@ Pipes StorageLiveView::blocksToPipes(BlocksPtrs blocks, Block & sample_block)
}
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes)
QueryPipeline StorageLiveView::completeQuery(Pipes pipes)
{
//FIXME it's dangerous to create Context on stack
auto block_context = Context::createCopy(getContext());
@ -147,18 +156,25 @@ BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes)
std::move(pipes), QueryProcessingStage::WithMergeableState);
};
block_context->addExternalTable(getBlocksTableName(), TemporaryTableHolder(getContext(), creator));
InterpreterSelectQuery select(getInnerBlocksQuery(), block_context, StoragePtr(), nullptr, SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().getInputStream());
auto io = select.execute();
io.pipeline.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<MaterializingTransform>(cur_header);
});
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
data = std::make_shared<SquashingBlockInputStream>(
data, getContext()->getSettingsRef().min_insert_block_size_rows,
getContext()->getSettingsRef().min_insert_block_size_bytes);
io.pipeline.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<SquashingChunksTransform>(
cur_header,
getContext()->getSettingsRef().min_insert_block_size_rows,
getContext()->getSettingsRef().min_insert_block_size_bytes);
});
return data;
return std::move(io.pipeline);
}
void StorageLiveView::writeIntoLiveView(
@ -166,7 +182,7 @@ void StorageLiveView::writeIntoLiveView(
const Block & block,
ContextPtr local_context)
{
BlockOutputStreamPtr output = std::make_shared<LiveViewBlockOutputStream>(live_view);
auto output = std::make_shared<LiveViewSink>(live_view);
/// Check if live view has any readers if not
/// just reset blocks to empty and do nothing else
@ -220,10 +236,16 @@ void StorageLiveView::writeIntoLiveView(
InterpreterSelectQuery select_block(mergeable_query, local_context, blocks_storage.getTable(), blocks_storage.getTable()->getInMemoryMetadataPtr(),
QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
select_block.execute().getInputStream());
auto io = select_block.execute();
io.pipeline.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<MaterializingTransform>(cur_header);
});
while (Block this_block = data_mergeable_stream->read())
PullingPipelineExecutor executor(io.pipeline);
Block this_block;
while (executor.pull(this_block))
new_mergeable_blocks->push_back(this_block);
if (new_mergeable_blocks->empty())
@ -238,8 +260,15 @@ void StorageLiveView::writeIntoLiveView(
}
}
BlockInputStreamPtr data = live_view.completeQuery(std::move(from));
copyData(*data, *output);
auto pipeline = live_view.completeQuery(std::move(from));
pipeline.resize(1);
pipeline.setSinks([&](const Block &, Pipe::StreamType)
{
return std::move(output);
});
auto executor = pipeline.execute();
executor->execute(pipeline.getNumThreads());
}
@ -351,9 +380,11 @@ bool StorageLiveView::getNewBlocks()
/// inserted data to be duplicated
auto new_mergeable_blocks = collectMergeableBlocks(live_view_context);
Pipes from = blocksToPipes(new_mergeable_blocks->blocks, new_mergeable_blocks->sample_block);
BlockInputStreamPtr data = completeQuery(std::move(from));
auto pipeline = completeQuery(std::move(from));
while (Block block = data->read())
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
/// calculate hash before virtual column is added
block.updateHash(hash);
@ -521,7 +552,7 @@ Pipe StorageLiveView::read(
return Pipe(std::make_shared<BlocksSource>(blocks_ptr, getHeader()));
}
BlockInputStreams StorageLiveView::watch(
Pipe StorageLiveView::watch(
const Names & /*column_names*/,
const SelectQueryInfo & query_info,
ContextPtr local_context,
@ -533,7 +564,7 @@ BlockInputStreams StorageLiveView::watch(
bool has_limit = false;
UInt64 limit = 0;
BlockInputStreamPtr reader;
Pipe reader;
if (query.limit_length)
{
@ -542,15 +573,15 @@ BlockInputStreams StorageLiveView::watch(
}
if (query.is_watch_events)
reader = std::make_shared<LiveViewEventsBlockInputStream>(
reader = Pipe(std::make_shared<LiveViewEventsSource>(
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
local_context->getSettingsRef().live_view_heartbeat_interval.totalSeconds());
local_context->getSettingsRef().live_view_heartbeat_interval.totalSeconds()));
else
reader = std::make_shared<LiveViewBlockInputStream>(
reader = Pipe(std::make_shared<LiveViewSource>(
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
local_context->getSettingsRef().live_view_heartbeat_interval.totalSeconds());
local_context->getSettingsRef().live_view_heartbeat_interval.totalSeconds()));
{
std::lock_guard lock(mutex);
@ -563,7 +594,7 @@ BlockInputStreams StorageLiveView::watch(
}
processed_stage = QueryProcessingStage::Complete;
return { reader };
return reader;
}
NamesAndTypesList StorageLiveView::getVirtuals() const

View File

@ -52,9 +52,9 @@ using Pipes = std::vector<Pipe>;
class StorageLiveView final : public shared_ptr_helper<StorageLiveView>, public IStorage, WithContext
{
friend struct shared_ptr_helper<StorageLiveView>;
friend class LiveViewBlockInputStream;
friend class LiveViewEventsBlockInputStream;
friend class LiveViewBlockOutputStream;
friend class LiveViewSource;
friend class LiveViewEventsSource;
friend class LiveViewSink;
public:
~StorageLiveView() override;
@ -153,7 +153,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockInputStreams watch(
Pipe watch(
const Names & column_names,
const SelectQueryInfo & query_info,
ContextPtr context,
@ -167,7 +167,7 @@ public:
/// Collect mergeable blocks and their sample. Must be called holding mutex
MergeableBlocksPtr collectMergeableBlocks(ContextPtr context);
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr completeQuery(Pipes pipes);
QueryPipeline completeQuery(Pipes pipes);
void setMergeableBlocks(MergeableBlocksPtr blocks) { mergeable_blocks = blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }

View File

@ -40,7 +40,7 @@ public:
return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getInMemoryMetadataPtr(), info);
}
BlockInputStreams watch(
Pipe watch(
const Names & column_names,
const SelectQueryInfo & query_info,
ContextPtr context,