From 96b5ef84592d590d41e140ef80f36e994cf3d82a Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 14 Feb 2020 13:57:09 +0300 Subject: [PATCH] Processors support for StorageStripeLog reading. --- dbms/src/Storages/StorageStripeLog.cpp | 69 ++++++++++++++++---------- dbms/src/Storages/StorageStripeLog.h | 4 +- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/dbms/src/Storages/StorageStripeLog.cpp b/dbms/src/Storages/StorageStripeLog.cpp index 5aa3b52f969..02de666807f 100644 --- a/dbms/src/Storages/StorageStripeLog.cpp +++ b/dbms/src/Storages/StorageStripeLog.cpp @@ -28,6 +28,9 @@ #include #include +#include +#include +#include namespace DB @@ -45,35 +48,46 @@ namespace ErrorCodes } -class StripeLogBlockInputStream final : public IBlockInputStream +class StripeLogSource final : public SourceWithProgress { public: - StripeLogBlockInputStream(StorageStripeLog & storage_, size_t max_read_buffer_size_, + + static Block getHeader( + StorageStripeLog & storage, + const Names & column_names, + IndexForNativeFormat::Blocks::const_iterator index_begin, + IndexForNativeFormat::Blocks::const_iterator index_end) + { + if (index_begin == index_end) + return storage.getSampleBlockForColumns(column_names); + + /// TODO: check if possible to always return storage.getSampleBlock() + + Block header; + + for (const auto & column : index_begin->columns) + { + auto type = DataTypeFactory::instance().get(column.type); + header.insert(ColumnWithTypeAndName{ type, column.name }); + } + + return header; + } + + StripeLogSource(StorageStripeLog & storage_, const Names & column_names, size_t max_read_buffer_size_, std::shared_ptr & index_, IndexForNativeFormat::Blocks::const_iterator index_begin_, IndexForNativeFormat::Blocks::const_iterator index_end_) - : storage(storage_), max_read_buffer_size(max_read_buffer_size_), - index(index_), index_begin(index_begin_), index_end(index_end_) + : SourceWithProgress(getHeader(storage_, column_names, index_begin_, index_end_)) + , storage(storage_), max_read_buffer_size(max_read_buffer_size_) + , index(index_), index_begin(index_begin_), index_end(index_end_) { - if (index_begin != index_end) - { - for (const auto & column : index_begin->columns) - { - auto type = DataTypeFactory::instance().get(column.type); - header.insert(ColumnWithTypeAndName{ type, column.name }); - } - } } String getName() const override { return "StripeLog"; } - Block getHeader() const override - { - return header; - } - protected: - Block readImpl() override + Chunk generate() override { Block res; start(); @@ -91,7 +105,7 @@ protected: } } - return res; + return Chunk(res.getColumns(), res.rows()); } private: @@ -235,7 +249,7 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Strin } -BlockInputStreams StorageStripeLog::read( +Pipes StorageStripeLog::readWithProcessors( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, @@ -249,15 +263,18 @@ BlockInputStreams StorageStripeLog::read( NameSet column_names_set(column_names.begin(), column_names.end()); + Pipes pipes; + String index_file = table_path + "index.mrk"; if (!disk->exists(index_file)) - return { std::make_shared(getSampleBlockForColumns(column_names)) }; + { + pipes.emplace_back(std::make_shared(getSampleBlockForColumns(column_names))); + return pipes; + } CompressedReadBufferFromFile index_in(fullPath(disk, index_file), 0, 0, 0, INDEX_BUFFER_SIZE); std::shared_ptr index{std::make_shared(index_in, column_names_set)}; - BlockInputStreams res; - size_t size = index->blocks.size(); if (num_streams > size) num_streams = size; @@ -270,13 +287,13 @@ BlockInputStreams StorageStripeLog::read( std::advance(begin, stream * size / num_streams); std::advance(end, (stream + 1) * size / num_streams); - res.emplace_back(std::make_shared( - *this, context.getSettingsRef().max_read_buffer_size, index, begin, end)); + pipes.emplace_back(std::make_shared( + *this, column_names, context.getSettingsRef().max_read_buffer_size, index, begin, end)); } /// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change. - return res; + return pipes; } diff --git a/dbms/src/Storages/StorageStripeLog.h b/dbms/src/Storages/StorageStripeLog.h index d87e3f9170a..0c394363807 100644 --- a/dbms/src/Storages/StorageStripeLog.h +++ b/dbms/src/Storages/StorageStripeLog.h @@ -18,14 +18,14 @@ namespace DB */ class StorageStripeLog : public ext::shared_ptr_helper, public IStorage { - friend class StripeLogBlockInputStream; + friend class StripeLogSource; friend class StripeLogBlockOutputStream; friend struct ext::shared_ptr_helper; public: String getName() const override { return "StripeLog"; } - BlockInputStreams read( + Pipes readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, const Context & context,