diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index e5d4d952a0c..3589176f231 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -261,7 +261,6 @@ BlockIO InterpreterInsertQuery::execute() { InterpreterWatchQuery interpreter_watch{ query.watch, getContext() }; res = interpreter_watch.execute(); - res.pipeline.init(Pipe(std::make_shared(std::move(res.in)))); } for (size_t i = 0; i < out_streams_size; i++) diff --git a/src/Interpreters/InterpreterWatchQuery.cpp b/src/Interpreters/InterpreterWatchQuery.cpp index edf0f37c00e..ee96045bbc4 100644 --- a/src/Interpreters/InterpreterWatchQuery.cpp +++ b/src/Interpreters/InterpreterWatchQuery.cpp @@ -71,10 +71,9 @@ BlockIO InterpreterWatchQuery::execute() QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; /// Watch storage - streams = storage->watch(required_columns, query_info, getContext(), from_stage, max_block_size, max_streams); + auto pipe = storage->watch(required_columns, query_info, getContext(), from_stage, max_block_size, max_streams); /// Constraints on the result, the quota on the result, and also callback for progress. - if (IBlockInputStream * stream = dynamic_cast(streams[0].get())) { StreamLocalLimits limits; limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048 @@ -82,11 +81,11 @@ BlockIO InterpreterWatchQuery::execute() limits.size_limits.max_bytes = settings.max_result_bytes; limits.size_limits.overflow_mode = settings.result_overflow_mode; - stream->setLimits(limits); - stream->setQuota(getContext()->getQuota()); + pipe.setLimits(limits); + pipe.setQuota(getContext()->getQuota()); } - res.in = streams[0]; + res.pipeline.init(std::move(pipe)); return res; } diff --git a/src/Processors/Transforms/SquashingChunksTransform.cpp b/src/Processors/Transforms/SquashingChunksTransform.cpp new file mode 100644 index 00000000000..398ce9eb9fb --- /dev/null +++ b/src/Processors/Transforms/SquashingChunksTransform.cpp @@ -0,0 +1,27 @@ +#include + +namespace DB +{ + +SquashingChunksTransform::SquashingChunksTransform( + const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory) + : IAccumulatingTransform(header, header) + , squashing(min_block_size_rows, min_block_size_bytes, reserve_memory) +{ +} + +void SquashingChunksTransform::consume(Chunk chunk) +{ + if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()))) + { + setReadyChunk(Chunk(block.getColumns(), block.rows())); + } +} + +Chunk SquashingChunksTransform::generate() +{ + auto block = squashing.add({}); + return Chunk(block.getColumns(), block.rows()); +} + +} diff --git a/src/Processors/Transforms/SquashingChunksTransform.h b/src/Processors/Transforms/SquashingChunksTransform.h new file mode 100644 index 00000000000..bcacf5abcda --- /dev/null +++ b/src/Processors/Transforms/SquashingChunksTransform.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include + +namespace DB +{ + +class SquashingChunksTransform : public IAccumulatingTransform +{ +public: + explicit SquashingChunksTransform( + const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory = false); + + String getName() const override { return "SquashingTransform"; } + +protected: + void consume(Chunk chunk) override; + Chunk generate() override; + +private: + SquashingTransform squashing; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 543a08caca5..50faac6d97b 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -165,6 +165,7 @@ SRCS( Transforms/ReverseTransform.cpp Transforms/RollupTransform.cpp Transforms/SortingTransform.cpp + Transforms/SquashingChunksTransform.cpp Transforms/TotalsHavingTransform.cpp Transforms/WindowTransform.cpp Transforms/getSourceFromFromASTInsertQuery.cpp diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 2180f92df98..85bfbfb1f84 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -264,7 +264,7 @@ public: * * It is guaranteed that the structure of the table will not change over the lifetime of the returned streams (that is, there will not be ALTER, RENAME and DROP). */ - virtual BlockInputStreams watch( + virtual Pipe watch( const Names & /*column_names*/, const SelectQueryInfo & /*query_info*/, ContextPtr /*context*/, diff --git a/src/Storages/LiveView/LiveViewEventsBlockInputStream.h b/src/Storages/LiveView/LiveViewEventsSource.h similarity index 90% rename from src/Storages/LiveView/LiveViewEventsBlockInputStream.h rename to src/Storages/LiveView/LiveViewEventsSource.h index dc6848ec20c..daf9edfef95 100644 --- a/src/Storages/LiveView/LiveViewEventsBlockInputStream.h +++ b/src/Storages/LiveView/LiveViewEventsSource.h @@ -16,7 +16,7 @@ limitations under the License. */ #include #include #include -#include +#include #include @@ -27,7 +27,7 @@ namespace DB * Keeps stream alive by outputting blocks with no rows * based on period specified by the heartbeat interval. */ -class LiveViewEventsBlockInputStream : public IBlockInputStream +class LiveViewEventsSource : public SourceWithProgress { using NonBlockingResult = std::pair; @@ -35,13 +35,14 @@ using NonBlockingResult = std::pair; public: /// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update /// and LIMIT 0 just returns data without waiting for any updates - LiveViewEventsBlockInputStream(std::shared_ptr storage_, + LiveViewEventsSource(std::shared_ptr storage_, std::shared_ptr blocks_ptr_, std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, const UInt64 heartbeat_interval_sec_) - : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), + : SourceWithProgress({ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared(), "version")}), + storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), @@ -51,22 +52,17 @@ public: active = active_ptr.lock(); } - String getName() const override { return "LiveViewEventsBlockInputStream"; } + String getName() const override { return "LiveViewEventsSource"; } - void cancel(bool kill) override + void onCancel() override { if (isCancelled() || storage->shutdown_called) return; - IBlockInputStream::cancel(kill); + std::lock_guard lock(storage->mutex); storage->condition.notify_all(); } - Block getHeader() const override - { - return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared(), "version")}; - } - void refresh() { if (active && blocks && it == end) @@ -109,10 +105,11 @@ public: return res; } protected: - Block readImpl() override + Chunk generate() override { /// try reading - return tryReadImpl(true).first; + auto block = tryReadImpl(true).first; + return Chunk(block.getColumns(), block.rows()); } /** tryRead method attempts to read a block in either blocking @@ -170,7 +167,7 @@ protected: if (!end_of_blocks) { end_of_blocks = true; - return { getHeader(), true }; + return { getPort().getHeader(), true }; } while (true) { @@ -192,7 +189,7 @@ protected: { // repeat the event block as a heartbeat last_event_timestamp_usec = static_cast(timestamp.epochMicroseconds()); - return { getHeader(), true }; + return { getPort().getHeader(), true }; } } } diff --git a/src/Storages/LiveView/LiveViewBlockOutputStream.h b/src/Storages/LiveView/LiveViewSink.h similarity index 74% rename from src/Storages/LiveView/LiveViewBlockOutputStream.h rename to src/Storages/LiveView/LiveViewSink.h index 6b8a5a2cb9e..433a5554152 100644 --- a/src/Storages/LiveView/LiveViewBlockOutputStream.h +++ b/src/Storages/LiveView/LiveViewSink.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include @@ -9,19 +9,28 @@ namespace DB { -class LiveViewBlockOutputStream : public IBlockOutputStream +class LiveViewSink : public SinkToStorage { -public: - explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {} + /// _version column is added manually in sink. + static Block updateHeader(Block block) + { + block.erase("_version"); + return block; + } - void writePrefix() override +public: + explicit LiveViewSink(StorageLiveView & storage_) : SinkToStorage(updateHeader(storage_.getHeader())), storage(storage_) {} + + String getName() const override { return "LiveViewSink"; } + + void onStart() override { new_blocks = std::make_shared(); new_blocks_metadata = std::make_shared(); new_hash = std::make_shared(); } - void writeSuffix() override + void onFinish() override { UInt128 key; String key_str; @@ -65,14 +74,13 @@ public: new_hash.reset(); } - void write(const Block & block) override + void consume(Chunk chunk) override { - new_blocks->push_back(block); + auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns()); block.updateHash(*new_hash); + new_blocks->push_back(std::move(block)); } - Block getHeader() const override { return storage.getHeader(); } - private: using SipHashPtr = std::shared_ptr; diff --git a/src/Storages/LiveView/LiveViewBlockInputStream.h b/src/Storages/LiveView/LiveViewSource.h similarity index 89% rename from src/Storages/LiveView/LiveViewBlockInputStream.h rename to src/Storages/LiveView/LiveViewSource.h index 737e76754c5..af07d8558ad 100644 --- a/src/Storages/LiveView/LiveViewBlockInputStream.h +++ b/src/Storages/LiveView/LiveViewSource.h @@ -1,6 +1,7 @@ #pragma once -#include +#include +#include namespace DB @@ -10,19 +11,20 @@ namespace DB * Keeps stream alive by outputting blocks with no rows * based on period specified by the heartbeat interval. */ -class LiveViewBlockInputStream : public IBlockInputStream +class LiveViewSource : public SourceWithProgress { using NonBlockingResult = std::pair; public: - LiveViewBlockInputStream(std::shared_ptr storage_, + LiveViewSource(std::shared_ptr storage_, std::shared_ptr blocks_ptr_, std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, const UInt64 heartbeat_interval_sec_) - : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), + : SourceWithProgress(storage_->getHeader()) + , storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), @@ -34,17 +36,15 @@ public: String getName() const override { return "LiveViewBlockInputStream"; } - void cancel(bool kill) override + void onCancel() override { if (isCancelled() || storage->shutdown_called) return; - IBlockInputStream::cancel(kill); + std::lock_guard lock(storage->mutex); storage->condition.notify_all(); } - Block getHeader() const override { return storage->getHeader(); } - void refresh() { if (active && blocks && it == end) @@ -74,10 +74,11 @@ public: } protected: - Block readImpl() override + Chunk generate() override { /// try reading - return tryReadImpl(true).first; + auto block = tryReadImpl(true).first; + return Chunk(block.getColumns(), block.rows()); } /** tryRead method attempts to read a block in either blocking @@ -135,7 +136,7 @@ protected: if (!end_of_blocks) { end_of_blocks = true; - return { getHeader(), true }; + return { getPort().getHeader(), true }; } while (true) { @@ -157,7 +158,7 @@ protected: { // heartbeat last_event_timestamp_usec = static_cast(Poco::Timestamp().epochMicroseconds()); - return { getHeader(), true }; + return { getPort().getHeader(), true }; } } } diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index 5f5ce8a4a37..69390850ccc 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -15,10 +15,11 @@ limitations under the License. */ #include #include #include -#include #include -#include -#include +#include +#include +#include +#include #include #include #include @@ -26,9 +27,9 @@ limitations under the License. */ #include #include -#include -#include -#include +#include +#include +#include #include #include @@ -110,15 +111,23 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont InterpreterSelectQuery interpreter(mergeable_query->clone(), local_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); - auto view_mergeable_stream = std::make_shared(interpreter.execute().getInputStream()); + auto io = interpreter.execute(); + io.pipeline.addSimpleTransform([&](const Block & cur_header) + { + return std::make_shared(cur_header); + }); - while (Block this_block = view_mergeable_stream->read()) + new_mergeable_blocks->sample_block = io.pipeline.getHeader(); + + PullingPipelineExecutor executor(io.pipeline); + Block this_block; + + while (executor.pull(this_block)) base_blocks->push_back(this_block); new_blocks->push_back(base_blocks); new_mergeable_blocks->blocks = new_blocks; - new_mergeable_blocks->sample_block = view_mergeable_stream->getHeader(); return new_mergeable_blocks; } @@ -133,7 +142,7 @@ Pipes StorageLiveView::blocksToPipes(BlocksPtrs blocks, Block & sample_block) } /// Complete query using input streams from mergeable blocks -BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes) +QueryPipeline StorageLiveView::completeQuery(Pipes pipes) { //FIXME it's dangerous to create Context on stack auto block_context = Context::createCopy(getContext()); @@ -147,18 +156,25 @@ BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes) std::move(pipes), QueryProcessingStage::WithMergeableState); }; block_context->addExternalTable(getBlocksTableName(), TemporaryTableHolder(getContext(), creator)); - InterpreterSelectQuery select(getInnerBlocksQuery(), block_context, StoragePtr(), nullptr, SelectQueryOptions(QueryProcessingStage::Complete)); - BlockInputStreamPtr data = std::make_shared(select.execute().getInputStream()); + auto io = select.execute(); + io.pipeline.addSimpleTransform([&](const Block & cur_header) + { + return std::make_shared(cur_header); + }); /// Squashing is needed here because the view query can generate a lot of blocks /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY /// and two-level aggregation is triggered). - data = std::make_shared( - data, getContext()->getSettingsRef().min_insert_block_size_rows, - getContext()->getSettingsRef().min_insert_block_size_bytes); + io.pipeline.addSimpleTransform([&](const Block & cur_header) + { + return std::make_shared( + cur_header, + getContext()->getSettingsRef().min_insert_block_size_rows, + getContext()->getSettingsRef().min_insert_block_size_bytes); + }); - return data; + return std::move(io.pipeline); } void StorageLiveView::writeIntoLiveView( @@ -166,7 +182,7 @@ void StorageLiveView::writeIntoLiveView( const Block & block, ContextPtr local_context) { - BlockOutputStreamPtr output = std::make_shared(live_view); + auto output = std::make_shared(live_view); /// Check if live view has any readers if not /// just reset blocks to empty and do nothing else @@ -220,10 +236,16 @@ void StorageLiveView::writeIntoLiveView( InterpreterSelectQuery select_block(mergeable_query, local_context, blocks_storage.getTable(), blocks_storage.getTable()->getInMemoryMetadataPtr(), QueryProcessingStage::WithMergeableState); - auto data_mergeable_stream = std::make_shared( - select_block.execute().getInputStream()); + auto io = select_block.execute(); + io.pipeline.addSimpleTransform([&](const Block & cur_header) + { + return std::make_shared(cur_header); + }); - while (Block this_block = data_mergeable_stream->read()) + PullingPipelineExecutor executor(io.pipeline); + Block this_block; + + while (executor.pull(this_block)) new_mergeable_blocks->push_back(this_block); if (new_mergeable_blocks->empty()) @@ -238,8 +260,15 @@ void StorageLiveView::writeIntoLiveView( } } - BlockInputStreamPtr data = live_view.completeQuery(std::move(from)); - copyData(*data, *output); + auto pipeline = live_view.completeQuery(std::move(from)); + pipeline.resize(1); + pipeline.setSinks([&](const Block &, Pipe::StreamType) + { + return std::move(output); + }); + + auto executor = pipeline.execute(); + executor->execute(pipeline.getNumThreads()); } @@ -351,9 +380,11 @@ bool StorageLiveView::getNewBlocks() /// inserted data to be duplicated auto new_mergeable_blocks = collectMergeableBlocks(live_view_context); Pipes from = blocksToPipes(new_mergeable_blocks->blocks, new_mergeable_blocks->sample_block); - BlockInputStreamPtr data = completeQuery(std::move(from)); + auto pipeline = completeQuery(std::move(from)); - while (Block block = data->read()) + PullingPipelineExecutor executor(pipeline); + Block block; + while (executor.pull(block)) { /// calculate hash before virtual column is added block.updateHash(hash); @@ -521,7 +552,7 @@ Pipe StorageLiveView::read( return Pipe(std::make_shared(blocks_ptr, getHeader())); } -BlockInputStreams StorageLiveView::watch( +Pipe StorageLiveView::watch( const Names & /*column_names*/, const SelectQueryInfo & query_info, ContextPtr local_context, @@ -533,7 +564,7 @@ BlockInputStreams StorageLiveView::watch( bool has_limit = false; UInt64 limit = 0; - BlockInputStreamPtr reader; + Pipe reader; if (query.limit_length) { @@ -542,15 +573,15 @@ BlockInputStreams StorageLiveView::watch( } if (query.is_watch_events) - reader = std::make_shared( + reader = Pipe(std::make_shared( std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, - local_context->getSettingsRef().live_view_heartbeat_interval.totalSeconds()); + local_context->getSettingsRef().live_view_heartbeat_interval.totalSeconds())); else - reader = std::make_shared( + reader = Pipe(std::make_shared( std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, - local_context->getSettingsRef().live_view_heartbeat_interval.totalSeconds()); + local_context->getSettingsRef().live_view_heartbeat_interval.totalSeconds())); { std::lock_guard lock(mutex); @@ -563,7 +594,7 @@ BlockInputStreams StorageLiveView::watch( } processed_stage = QueryProcessingStage::Complete; - return { reader }; + return reader; } NamesAndTypesList StorageLiveView::getVirtuals() const diff --git a/src/Storages/LiveView/StorageLiveView.h b/src/Storages/LiveView/StorageLiveView.h index 23a9c84cb9e..15afc642989 100644 --- a/src/Storages/LiveView/StorageLiveView.h +++ b/src/Storages/LiveView/StorageLiveView.h @@ -52,9 +52,9 @@ using Pipes = std::vector; class StorageLiveView final : public shared_ptr_helper, public IStorage, WithContext { friend struct shared_ptr_helper; -friend class LiveViewBlockInputStream; -friend class LiveViewEventsBlockInputStream; -friend class LiveViewBlockOutputStream; +friend class LiveViewSource; +friend class LiveViewEventsSource; +friend class LiveViewSink; public: ~StorageLiveView() override; @@ -153,7 +153,7 @@ public: size_t max_block_size, unsigned num_streams) override; - BlockInputStreams watch( + Pipe watch( const Names & column_names, const SelectQueryInfo & query_info, ContextPtr context, @@ -167,7 +167,7 @@ public: /// Collect mergeable blocks and their sample. Must be called holding mutex MergeableBlocksPtr collectMergeableBlocks(ContextPtr context); /// Complete query using input streams from mergeable blocks - BlockInputStreamPtr completeQuery(Pipes pipes); + QueryPipeline completeQuery(Pipes pipes); void setMergeableBlocks(MergeableBlocksPtr blocks) { mergeable_blocks = blocks; } std::shared_ptr getActivePtr() { return active_ptr; } diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index 521a2b8d642..c81ef6febdc 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -40,7 +40,7 @@ public: return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getInMemoryMetadataPtr(), info); } - BlockInputStreams watch( + Pipe watch( const Names & column_names, const SelectQueryInfo & query_info, ContextPtr context,