diff --git a/dbms/src/Storages/StorageJoin.h b/dbms/src/Storages/StorageJoin.h index 4815f53c7e6..4c6bc3b9bf2 100644 --- a/dbms/src/Storages/StorageJoin.h +++ b/dbms/src/Storages/StorageJoin.h @@ -44,6 +44,8 @@ public: size_t max_block_size, unsigned num_streams) override; + bool supportProcessorsPipeline() const override { return true; } + private: Block sample_block; const Names key_names; diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 44c3af28267..24d798e955a 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -20,6 +20,8 @@ #include #include +#include +#include #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" @@ -41,13 +43,25 @@ namespace ErrorCodes } -class LogBlockInputStream final : public IBlockInputStream +class LogSource final : public SourceWithProgress { public: - LogBlockInputStream( + + static Block getHeader(const NamesAndTypesList & columns) + { + Block res; + + for (const auto & name_type : columns) + res.insert({ name_type.type->createColumn(), name_type.type, name_type.name }); + + return Nested::flatten(res); + } + + LogSource( size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_, size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_) - : block_size(block_size_), + : SourceWithProgress(getHeader(columns_)), + block_size(block_size_), columns(columns_), storage(storage_), mark_number(mark_number_), @@ -58,18 +72,8 @@ public: String getName() const override { return "Log"; } - Block getHeader() const override - { - Block res; - - for (const auto & name_type : columns) - res.insert({ name_type.type->createColumn(), name_type.type, name_type.name }); - - return Nested::flatten(res); - } - protected: - Block readImpl() override; + Chunk generate() override; private: size_t block_size; @@ -181,15 +185,15 @@ private: }; -Block LogBlockInputStream::readImpl() +Chunk LogSource::generate() { Block res; if (rows_read == rows_limit) - return res; + return {}; if (storage.disk->isDirectoryEmpty(storage.table_path)) - return res; + return {}; /// How many rows to read for the next block. size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read); @@ -208,7 +212,7 @@ Block LogBlockInputStream::readImpl() throw; } - if (column->size()) + if (!column->empty()) res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name)); } @@ -224,11 +228,13 @@ Block LogBlockInputStream::readImpl() streams.clear(); } - return Nested::flatten(res); + res = Nested::flatten(res); + UInt64 num_rows = res.rows(); + return Chunk(res.getColumns(), num_rows); } -void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read) +void LogSource::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read) { IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint. @@ -564,7 +570,7 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const return it->second.marks; } -BlockInputStreams StorageLog::read( +Pipes StorageLog::readWithProcessors( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, @@ -579,7 +585,7 @@ BlockInputStreams StorageLog::read( std::shared_lock lock(rwlock); - BlockInputStreams res; + Pipes pipes; const Marks & marks = getMarksWithRealRowCount(); size_t marks_size = marks.size(); @@ -597,7 +603,7 @@ BlockInputStreams StorageLog::read( size_t rows_begin = mark_begin ? marks[mark_begin - 1].rows : 0; size_t rows_end = mark_end ? marks[mark_end - 1].rows : 0; - res.emplace_back(std::make_shared( + pipes.emplace_back(std::make_shared( max_block_size, all_columns, *this, @@ -606,7 +612,7 @@ BlockInputStreams StorageLog::read( max_read_buffer_size)); } - return res; + return pipes; } BlockOutputStreamPtr StorageLog::write( diff --git a/dbms/src/Storages/StorageLog.h b/dbms/src/Storages/StorageLog.h index 696196e4069..e19fb8df07b 100644 --- a/dbms/src/Storages/StorageLog.h +++ b/dbms/src/Storages/StorageLog.h @@ -17,14 +17,14 @@ namespace DB */ class StorageLog : public ext::shared_ptr_helper, public IStorage { - friend class LogBlockInputStream; + friend class LogSource; friend class LogBlockOutputStream; friend struct ext::shared_ptr_helper; public: String getName() const override { return "Log"; } - BlockInputStreams read( + Pipes readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, @@ -32,6 +32,8 @@ public: size_t max_block_size, unsigned num_streams) override; + bool supportProcessorsPipeline() const override { return true; } + BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; void rename(