From 8d14f2ef8fd99d4f751f983f8500845902b9dfec Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 4 Aug 2021 20:58:18 +0300 Subject: [PATCH] Streams -> Processors for dicts, part 1. --- src/Dictionaries/CacheDictionary.cpp | 33 ++-- src/Dictionaries/CacheDictionary.h | 2 +- .../CassandraBlockInputStream.cpp | 26 ++-- src/Dictionaries/CassandraBlockInputStream.h | 10 +- .../CassandraDictionarySource.cpp | 22 ++- src/Dictionaries/CassandraDictionarySource.h | 8 +- .../ClickHouseDictionarySource.cpp | 49 +++--- src/Dictionaries/ClickHouseDictionarySource.h | 12 +- .../DictionaryBlockInputStream.cpp | 24 ++- src/Dictionaries/DictionaryBlockInputStream.h | 32 ++-- .../DictionaryBlockInputStreamBase.cpp | 21 +-- .../DictionaryBlockInputStreamBase.h | 10 +- src/Dictionaries/DictionaryHelpers.h | 19 ++- src/Dictionaries/DictionarySourceHelpers.cpp | 60 +++----- src/Dictionaries/DictionarySourceHelpers.h | 17 +-- src/Dictionaries/DirectDictionary.cpp | 37 ++--- src/Dictionaries/DirectDictionary.h | 6 +- .../ExecutableDictionarySource.cpp | 142 ++++++++++-------- src/Dictionaries/ExecutableDictionarySource.h | 12 +- .../ExecutablePoolDictionarySource.cpp | 95 ++++++------ .../ExecutablePoolDictionarySource.h | 12 +- src/Dictionaries/FileDictionarySource.cpp | 11 +- src/Dictionaries/FileDictionarySource.h | 10 +- src/Dictionaries/FlatDictionary.cpp | 35 +++-- src/Dictionaries/FlatDictionary.h | 4 +- src/Dictionaries/HashedDictionary.cpp | 8 +- src/Dictionaries/HashedDictionary.h | 4 +- src/Dictionaries/IDictionary.h | 2 +- src/Dictionaries/IDictionarySource.h | 29 ++-- src/Dictionaries/readInvalidateQuery.cpp | 19 ++- src/Dictionaries/readInvalidateQuery.h | 6 +- 31 files changed, 407 insertions(+), 370 deletions(-) diff --git a/src/Dictionaries/CacheDictionary.cpp b/src/Dictionaries/CacheDictionary.cpp index 8a8a64fab36..a8754691425 100644 --- a/src/Dictionaries/CacheDictionary.cpp +++ b/src/Dictionaries/CacheDictionary.cpp @@ -13,6 +13,9 @@ #include #include +#include +#include + namespace ProfileEvents { extern const Event DictCacheKeysRequested; @@ -481,24 +484,28 @@ MutableColumns CacheDictionary::aggregateColumns( } template -BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const +Pipe CacheDictionary::read(const Names & column_names, size_t max_block_size) const { - std::shared_ptr stream; + Pipe pipe; { /// Write lock on storage const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; if constexpr (dictionary_key_type == DictionaryKeyType::simple) - stream = std::make_shared(shared_from_this(), max_block_size, cache_storage_ptr->getCachedSimpleKeys(), column_names); + pipe = Pipe(std::make_shared( + DictionarySourceData(shared_from_this(), cache_storage_ptr->getCachedSimpleKeys(), column_names), + max_block_size)); else { auto keys = cache_storage_ptr->getCachedComplexKeys(); - stream = std::make_shared(shared_from_this(), max_block_size, keys, column_names); + pipe = Pipe(std::make_shared( + DictionarySourceData(shared_from_this(), keys, column_names), + max_block_size)); } } - return stream; + return pipe; } template @@ -567,21 +574,21 @@ void CacheDictionary::update(CacheDictionaryUpdateUnitPtrloadIds(requested_keys_vector); + pipeline.init(current_source_ptr->loadIds(requested_keys_vector)); else - stream = current_source_ptr->loadKeys(update_unit_ptr->key_columns, requested_complex_key_rows); - - stream->readPrefix(); + pipeline.init(current_source_ptr->loadKeys(update_unit_ptr->key_columns, requested_complex_key_rows)); size_t skip_keys_size_offset = dict_struct.getKeysSize(); PaddedPODArray found_keys_in_source; Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable(); - while (Block block = stream->read()) + PullingPipelineExecutor executor(pipeline); + Block block; + while (executor.pull(block)) { Columns key_columns; key_columns.reserve(skip_keys_size_offset); @@ -625,8 +632,6 @@ void CacheDictionary::update(CacheDictionaryUpdateUnitPtrassumeMutable()); - stream->readSuffix(); - { /// Lock for cache modification ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; @@ -686,4 +691,4 @@ void CacheDictionary::update(CacheDictionaryUpdateUnitPtr; template class CacheDictionary; -} +} \ No newline at end of file diff --git a/src/Dictionaries/CacheDictionary.h b/src/Dictionaries/CacheDictionary.h index baaf99d290b..613d73b0f83 100644 --- a/src/Dictionaries/CacheDictionary.h +++ b/src/Dictionaries/CacheDictionary.h @@ -137,7 +137,7 @@ public: ColumnUInt8::Ptr hasKeys(const Columns & key_columns, const DataTypes & key_types) const override; - BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; + Pipe read(const Names & column_names, size_t max_block_size) const override; std::exception_ptr getLastException() const override; diff --git a/src/Dictionaries/CassandraBlockInputStream.cpp b/src/Dictionaries/CassandraBlockInputStream.cpp index 57a4555ea87..384717e2ba2 100644 --- a/src/Dictionaries/CassandraBlockInputStream.cpp +++ b/src/Dictionaries/CassandraBlockInputStream.cpp @@ -22,12 +22,13 @@ namespace ErrorCodes extern const int UNKNOWN_TYPE; } -CassandraBlockInputStream::CassandraBlockInputStream( +CassandraSource::CassandraSource( const CassSessionShared & session_, const String & query_str, const Block & sample_block, size_t max_block_size_) - : session(session_) + : SourceWithProgress(sample_block) + , session(session_) , statement(query_str.c_str(), /*parameters count*/ 0) , max_block_size(max_block_size_) , has_more_pages(cass_true) @@ -36,7 +37,7 @@ CassandraBlockInputStream::CassandraBlockInputStream( cassandraCheck(cass_statement_set_paging_size(statement, max_block_size)); } -void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, const CassValue * cass_value) +void CassandraSource::insertValue(IColumn & column, ValueType type, const CassValue * cass_value) { switch (type) { @@ -148,13 +149,15 @@ void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, co } } -void CassandraBlockInputStream::readPrefix() -{ - result_future = cass_session_execute(*session, statement); -} -Block CassandraBlockInputStream::readImpl() +Chunk CassandraSource::generate() { + if (!is_initialized) + { + result_future = cass_session_execute(*session, statement); + is_initialized = true; + } + if (!has_more_pages) return {}; @@ -194,12 +197,13 @@ Block CassandraBlockInputStream::readImpl() } } - assert(cass_result_row_count(result) == columns.front()->size()); + size_t num_rows = columns.front()->size(); + assert(cass_result_row_count(result) == num_rows); - return description.sample_block.cloneWithColumns(std::move(columns)); + return Chunk(std::move(columns), num_rows); } -void CassandraBlockInputStream::assertTypes(const CassResultPtr & result) +void CassandraSource::assertTypes(const CassResultPtr & result) { if (!assert_types) return; diff --git a/src/Dictionaries/CassandraBlockInputStream.h b/src/Dictionaries/CassandraBlockInputStream.h index 3b0e583e3ad..98adb19fee6 100644 --- a/src/Dictionaries/CassandraBlockInputStream.h +++ b/src/Dictionaries/CassandraBlockInputStream.h @@ -4,17 +4,17 @@ #if USE_CASSANDRA #include -#include +#include #include namespace DB { -class CassandraBlockInputStream final : public IBlockInputStream +class CassandraSource final : public SourceWithProgress { public: - CassandraBlockInputStream( + CassandraSource( const CassSessionShared & session_, const String & query_str, const Block & sample_block, @@ -24,12 +24,11 @@ public: Block getHeader() const override { return description.sample_block.cloneEmpty(); } - void readPrefix() override; private: using ValueType = ExternalResultDescription::ValueType; - Block readImpl() override; + Chunk generate() override; static void insertValue(IColumn & column, ValueType type, const CassValue * cass_value); void assertTypes(const CassResultPtr & result); @@ -40,6 +39,7 @@ private: ExternalResultDescription description; cass_bool_t has_more_pages; bool assert_types = true; + bool is_initialized = false; }; } diff --git a/src/Dictionaries/CassandraDictionarySource.cpp b/src/Dictionaries/CassandraDictionarySource.cpp index 7605b86ef90..8b31b4d6fa2 100644 --- a/src/Dictionaries/CassandraDictionarySource.cpp +++ b/src/Dictionaries/CassandraDictionarySource.cpp @@ -40,7 +40,6 @@ void registerDictionarySourceCassandra(DictionarySourceFactory & factory) #include #include "CassandraBlockInputStream.h" #include -#include namespace DB { @@ -132,12 +131,12 @@ void CassandraDictionarySource::maybeAllowFiltering(String & query) const query += " ALLOW FILTERING;"; } -BlockInputStreamPtr CassandraDictionarySource::loadAll() +Pipe CassandraDictionarySource::loadAll() { String query = query_builder.composeLoadAllQuery(); maybeAllowFiltering(query); LOG_INFO(log, "Loading all using query: {}", query); - return std::make_shared(getSession(), query, sample_block, max_block_size); + return Pipe(std::make_shared(getSession(), query, sample_block, max_block_size)); } std::string CassandraDictionarySource::toString() const @@ -145,15 +144,15 @@ std::string CassandraDictionarySource::toString() const return "Cassandra: " + settings.db + '.' + settings.table; } -BlockInputStreamPtr CassandraDictionarySource::loadIds(const std::vector & ids) +Pipe CassandraDictionarySource::loadIds(const std::vector & ids) { String query = query_builder.composeLoadIdsQuery(ids); maybeAllowFiltering(query); LOG_INFO(log, "Loading ids using query: {}", query); - return std::make_shared(getSession(), query, sample_block, max_block_size); + return Pipe(std::make_shared(getSession(), query, sample_block, max_block_size)); } -BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows) +Pipe CassandraDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows) { if (requested_rows.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No rows requested"); @@ -168,22 +167,19 @@ BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_colu partitions[partition_key.get64()].push_back(row); } - BlockInputStreams streams; + Pipes pipes; for (const auto & partition : partitions) { String query = query_builder.composeLoadKeysQuery(key_columns, partition.second, ExternalQueryBuilder::CASSANDRA_SEPARATE_PARTITION_KEY, settings.partition_key_prefix); maybeAllowFiltering(query); LOG_INFO(log, "Loading keys for partition hash {} using query: {}", partition.first, query); - streams.push_back(std::make_shared(getSession(), query, sample_block, max_block_size)); + pipes.push_back(Pipe(std::make_shared(getSession(), query, sample_block, max_block_size))); } - if (streams.size() == 1) - return streams.front(); - - return std::make_shared(streams, nullptr, settings.max_threads); + return Pipe::unitePipes(std::move(pipes)); } -BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll() +Pipe CassandraDictionarySource::loadUpdatedAll() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for CassandraDictionarySource"); } diff --git a/src/Dictionaries/CassandraDictionarySource.h b/src/Dictionaries/CassandraDictionarySource.h index c0a4e774d23..871e3dc4857 100644 --- a/src/Dictionaries/CassandraDictionarySource.h +++ b/src/Dictionaries/CassandraDictionarySource.h @@ -49,7 +49,7 @@ public: const String & config_prefix, Block & sample_block); - BlockInputStreamPtr loadAll() override; + Pipe loadAll() override; bool supportsSelectiveLoad() const override { return true; } @@ -62,11 +62,11 @@ public: return std::make_unique(dict_struct, settings, sample_block); } - BlockInputStreamPtr loadIds(const std::vector & ids) override; + Pipe loadIds(const std::vector & ids) override; - BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; + Pipe loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; - BlockInputStreamPtr loadUpdatedAll() override; + Pipe loadUpdatedAll() override; String toString() const override; diff --git a/src/Dictionaries/ClickHouseDictionarySource.cpp b/src/Dictionaries/ClickHouseDictionarySource.cpp index 42ec73ee520..8b2373302c8 100644 --- a/src/Dictionaries/ClickHouseDictionarySource.cpp +++ b/src/Dictionaries/ClickHouseDictionarySource.cpp @@ -1,8 +1,11 @@ #include "ClickHouseDictionarySource.h" #include #include -#include -#include +#include +#include +#include +#include +#include #include #include #include @@ -105,29 +108,29 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate() } } -BlockInputStreamPtr ClickHouseDictionarySource::loadAllWithSizeHint(std::atomic * result_size_hint) +Pipe ClickHouseDictionarySource::loadAllWithSizeHint(std::atomic * result_size_hint) { return createStreamForQuery(load_all_query, result_size_hint); } -BlockInputStreamPtr ClickHouseDictionarySource::loadAll() +Pipe ClickHouseDictionarySource::loadAll() { return createStreamForQuery(load_all_query); } -BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll() +Pipe ClickHouseDictionarySource::loadUpdatedAll() { String load_update_query = getUpdateFieldAndDate(); return createStreamForQuery(load_update_query); } -BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector & ids) +Pipe ClickHouseDictionarySource::loadIds(const std::vector & ids) { return createStreamForQuery(query_builder.composeLoadIdsQuery(ids)); } -BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows) +Pipe ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows) { String query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES); return createStreamForQuery(query); @@ -157,32 +160,41 @@ std::string ClickHouseDictionarySource::toString() const return "ClickHouse: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where); } -BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query, std::atomic * result_size_hint) +Pipe ClickHouseDictionarySource::createStreamForQuery(const String & query, std::atomic * result_size_hint) { - BlockInputStreamPtr stream; + QueryPipeline pipeline; /// Sample block should not contain first row default values auto empty_sample_block = sample_block.cloneEmpty(); if (configuration.is_local) { - stream = executeQuery(query, context, true).getInputStream(); - stream = std::make_shared(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position); + pipeline = executeQuery(query, context, true).pipeline; + auto converting = ActionsDAG::makeConvertingActions( + pipeline.getHeader().getColumnsWithTypeAndName(), + empty_sample_block.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Position); + + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, std::make_shared(converting)); + }); } else { - stream = std::make_shared(pool, query, empty_sample_block, context); + pipeline.init(Pipe(std::make_shared( + std::make_shared(pool, query, empty_sample_block, context), false, false))); } if (result_size_hint) { - stream->setProgressCallback([result_size_hint](const Progress & progress) + pipeline.setProgressCallback([result_size_hint](const Progress & progress) { *result_size_hint += progress.total_rows_to_read; }); } - return stream; + return QueryPipeline::getPipe(std::move(pipeline)); } std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const @@ -191,15 +203,16 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re if (configuration.is_local) { auto query_context = Context::createCopy(context); - auto input_block = executeQuery(request, query_context, true).getInputStream(); - return readInvalidateQuery(*input_block); + auto pipe = QueryPipeline::getPipe(executeQuery(request, query_context, true).pipeline); + return readInvalidateQuery(std::move(pipe)); } else { /// We pass empty block to RemoteBlockInputStream, because we don't know the structure of the result. Block invalidate_sample_block; - RemoteBlockInputStream invalidate_stream(pool, request, invalidate_sample_block, context); - return readInvalidateQuery(invalidate_stream); + Pipe pipe(std::make_shared( + std::make_shared(pool, request, invalidate_sample_block, context), false, false)); + return readInvalidateQuery(std::move(pipe)); } } diff --git a/src/Dictionaries/ClickHouseDictionarySource.h b/src/Dictionaries/ClickHouseDictionarySource.h index fe37610b9c4..f293c010ec3 100644 --- a/src/Dictionaries/ClickHouseDictionarySource.h +++ b/src/Dictionaries/ClickHouseDictionarySource.h @@ -44,15 +44,15 @@ public: ClickHouseDictionarySource(const ClickHouseDictionarySource & other); ClickHouseDictionarySource & operator=(const ClickHouseDictionarySource &) = delete; - BlockInputStreamPtr loadAllWithSizeHint(std::atomic * result_size_hint) override; + Pipe loadAllWithSizeHint(std::atomic * result_size_hint) override; - BlockInputStreamPtr loadAll() override; + Pipe loadAll() override; - BlockInputStreamPtr loadUpdatedAll() override; + Pipe loadUpdatedAll() override; - BlockInputStreamPtr loadIds(const std::vector & ids) override; + Pipe loadIds(const std::vector & ids) override; - BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; + Pipe loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; bool isModified() const override; bool supportsSelectiveLoad() const override { return true; } @@ -70,7 +70,7 @@ public: private: std::string getUpdateFieldAndDate(); - BlockInputStreamPtr createStreamForQuery(const String & query, std::atomic * result_size_hint = nullptr); + Pipe createStreamForQuery(const String & query, std::atomic * result_size_hint = nullptr); std::string doInvalidateQuery(const std::string & request) const; diff --git a/src/Dictionaries/DictionaryBlockInputStream.cpp b/src/Dictionaries/DictionaryBlockInputStream.cpp index c902a87cfe3..fedde8bd886 100644 --- a/src/Dictionaries/DictionaryBlockInputStream.cpp +++ b/src/Dictionaries/DictionaryBlockInputStream.cpp @@ -8,9 +8,9 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -DictionaryBlockInputStream::DictionaryBlockInputStream( - std::shared_ptr dictionary_, UInt64 max_block_size_, PaddedPODArray && ids_, const Names & column_names_) - : DictionaryBlockInputStreamBase(ids_.size(), max_block_size_) +DictionarySourceData::DictionarySourceData( + std::shared_ptr dictionary_, PaddedPODArray && ids_, const Names & column_names_) + : num_rows(ids_.size()) , dictionary(dictionary_) , column_names(column_names_) , ids(std::move(ids_)) @@ -18,12 +18,11 @@ DictionaryBlockInputStream::DictionaryBlockInputStream( { } -DictionaryBlockInputStream::DictionaryBlockInputStream( +DictionarySourceData::DictionarySourceData( std::shared_ptr dictionary_, - UInt64 max_block_size_, const PaddedPODArray & keys, const Names & column_names_) - : DictionaryBlockInputStreamBase(keys.size(), max_block_size_) + : num_rows(keys.size()) , dictionary(dictionary_) , column_names(column_names_) , key_type(DictionaryInputStreamKeyType::ComplexKey) @@ -32,14 +31,13 @@ DictionaryBlockInputStream::DictionaryBlockInputStream( fillKeyColumns(keys, 0, keys.size(), dictionary_structure, key_columns); } -DictionaryBlockInputStream::DictionaryBlockInputStream( +DictionarySourceData::DictionarySourceData( std::shared_ptr dictionary_, - UInt64 max_block_size_, const Columns & data_columns_, const Names & column_names_, GetColumnsFunction && get_key_columns_function_, GetColumnsFunction && get_view_columns_function_) - : DictionaryBlockInputStreamBase(data_columns_.front()->size(), max_block_size_) + : num_rows(data_columns_.front()->size()) , dictionary(dictionary_) , column_names(column_names_) , data_columns(data_columns_) @@ -49,7 +47,7 @@ DictionaryBlockInputStream::DictionaryBlockInputStream( { } -Block DictionaryBlockInputStream::getBlock(size_t start, size_t length) const +Block DictionarySourceData::getBlock(size_t start, size_t length) const { /// TODO: Rewrite switch (key_type) @@ -98,7 +96,7 @@ Block DictionaryBlockInputStream::getBlock(size_t start, size_t length) const throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected DictionaryInputStreamKeyType."); } -Block DictionaryBlockInputStream::fillBlock( +Block DictionarySourceData::fillBlock( const PaddedPODArray & ids_to_fill, const Columns & keys, const DataTypes & types, @@ -161,14 +159,14 @@ Block DictionaryBlockInputStream::fillBlock( return Block(block_columns); } -ColumnPtr DictionaryBlockInputStream::getColumnFromIds(const PaddedPODArray & ids_to_fill) +ColumnPtr DictionarySourceData::getColumnFromIds(const PaddedPODArray & ids_to_fill) { auto column_vector = ColumnVector::create(); column_vector->getData().assign(ids_to_fill); return column_vector; } -void DictionaryBlockInputStream::fillKeyColumns( +void DictionarySourceData::fillKeyColumns( const PaddedPODArray & keys, size_t start, size_t size, diff --git a/src/Dictionaries/DictionaryBlockInputStream.h b/src/Dictionaries/DictionaryBlockInputStream.h index 7692c910b94..c15406487e2 100644 --- a/src/Dictionaries/DictionaryBlockInputStream.h +++ b/src/Dictionaries/DictionaryBlockInputStream.h @@ -20,18 +20,16 @@ namespace DB /* BlockInputStream implementation for external dictionaries * read() returns blocks consisting of the in-memory contents of the dictionaries */ -class DictionaryBlockInputStream : public DictionaryBlockInputStreamBase +class DictionarySourceData { public: - DictionaryBlockInputStream( + DictionarySourceData( std::shared_ptr dictionary, - UInt64 max_block_size, PaddedPODArray && ids, const Names & column_names); - DictionaryBlockInputStream( + DictionarySourceData( std::shared_ptr dictionary, - UInt64 max_block_size, const PaddedPODArray & keys, const Names & column_names); @@ -41,18 +39,15 @@ public: // Calls get_key_columns_function to get key column for dictionary get function call // and get_view_columns_function to get key representation. // Now used in trie dictionary, where columns are stored as ip and mask, and are showed as string - DictionaryBlockInputStream( + DictionarySourceData( std::shared_ptr dictionary, - UInt64 max_block_size, const Columns & data_columns, const Names & column_names, GetColumnsFunction && get_key_columns_function, GetColumnsFunction && get_view_columns_function); - String getName() const override { return "Dictionary"; } - -protected: - Block getBlock(size_t start, size_t length) const override; + Block getBlock(size_t start, size_t length) const; + size_t getNumRows() const { return num_rows; } private: Block fillBlock( @@ -70,6 +65,7 @@ private: const DictionaryStructure & dictionary_structure, ColumnsWithTypeAndName & result); + const size_t num_rows; std::shared_ptr dictionary; Names column_names; PaddedPODArray ids; @@ -89,4 +85,18 @@ private: DictionaryInputStreamKeyType key_type; }; +class DictionarySource final : public DictionarySourceBase +{ +public: + DictionarySource(DictionarySourceData data_, UInt64 max_block_size) + : DictionarySourceBase(data_.getBlock(0, 0), data_.getNumRows(), max_block_size) + , data(std::move(data_)) + {} + + String getName() const override { return "DictionarySource"; } + Block getBlock(size_t start, size_t length) const override { return data.getBlock(start, length); } + + DictionarySourceData data; +}; + } diff --git a/src/Dictionaries/DictionaryBlockInputStreamBase.cpp b/src/Dictionaries/DictionaryBlockInputStreamBase.cpp index 3a3fd09220f..d5e6e8a1cda 100644 --- a/src/Dictionaries/DictionaryBlockInputStreamBase.cpp +++ b/src/Dictionaries/DictionaryBlockInputStreamBase.cpp @@ -2,25 +2,20 @@ namespace DB { -DictionaryBlockInputStreamBase::DictionaryBlockInputStreamBase(size_t rows_count_, size_t max_block_size_) - : rows_count(rows_count_), max_block_size(max_block_size_) +DictionarySourceBase::DictionarySourceBase(const Block & header, size_t rows_count_, size_t max_block_size_) + : SourceWithProgress(header), rows_count(rows_count_), max_block_size(max_block_size_) { } -Block DictionaryBlockInputStreamBase::readImpl() +Chunk DictionarySourceBase::generate() { if (next_row == rows_count) - return Block(); + return {}; - size_t block_size = std::min(max_block_size, rows_count - next_row); - Block block = getBlock(next_row, block_size); - next_row += block_size; - return block; -} - -Block DictionaryBlockInputStreamBase::getHeader() const -{ - return getBlock(0, 0); + size_t size = std::min(max_block_size, rows_count - next_row); + auto chunk = getChunk(next_row, size); + next_row += size; + return chunk; } } diff --git a/src/Dictionaries/DictionaryBlockInputStreamBase.h b/src/Dictionaries/DictionaryBlockInputStreamBase.h index fb99918aed8..1533cba82f6 100644 --- a/src/Dictionaries/DictionaryBlockInputStreamBase.h +++ b/src/Dictionaries/DictionaryBlockInputStreamBase.h @@ -1,24 +1,22 @@ #pragma once -#include +#include namespace DB { -class DictionaryBlockInputStreamBase : public IBlockInputStream +class DictionarySourceBase : public SourceWithProgress { protected: - DictionaryBlockInputStreamBase(size_t rows_count_, size_t max_block_size_); + DictionarySourceBase(const Block & header, size_t rows_count_, size_t max_block_size_); virtual Block getBlock(size_t start, size_t length) const = 0; - Block getHeader() const override; - private: const size_t rows_count; const size_t max_block_size; size_t next_row = 0; - Block readImpl() override; + Chunk generate() override; }; } diff --git a/src/Dictionaries/DictionaryHelpers.h b/src/Dictionaries/DictionaryHelpers.h index ed124ce1e0a..79459057822 100644 --- a/src/Dictionaries/DictionaryHelpers.h +++ b/src/Dictionaries/DictionaryHelpers.h @@ -15,7 +15,8 @@ #include #include #include -#include +#include +#include namespace DB @@ -501,10 +502,10 @@ private: * Note: readPrefix readImpl readSuffix will be called on stream object during function execution. */ template -void mergeBlockWithStream( +void mergeBlockWithPipe( size_t key_columns_size, Block & block_to_update, - BlockInputStreamPtr & stream) + Pipe pipe) { using KeyType = std::conditional_t; static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by updatePreviousyLoadedBlockWithStream"); @@ -555,9 +556,13 @@ void mergeBlockWithStream( auto result_fetched_columns = block_to_update.cloneEmptyColumns(); - stream->readPrefix(); + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); - while (Block block = stream->read()) + PullingPipelineExecutor executor(pipeline); + Block block; + + while (executor.pull(block)) { Columns block_key_columns; block_key_columns.reserve(key_columns_size); @@ -591,8 +596,6 @@ void mergeBlockWithStream( } } - stream->readSuffix(); - size_t result_fetched_rows = result_fetched_columns.front()->size(); size_t filter_hint = filter.size() - indexes_to_remove_count; @@ -645,4 +648,4 @@ static const PaddedPODArray & getColumnVectorData( } } -} +} \ No newline at end of file diff --git a/src/Dictionaries/DictionarySourceHelpers.cpp b/src/Dictionaries/DictionarySourceHelpers.cpp index 54ed07092d3..79d4d2e5376 100644 --- a/src/Dictionaries/DictionarySourceHelpers.cpp +++ b/src/Dictionaries/DictionarySourceHelpers.cpp @@ -85,62 +85,44 @@ ContextMutablePtr copyContextAndApplySettings( return local_context; } - -BlockInputStreamWithAdditionalColumns::BlockInputStreamWithAdditionalColumns( - Block block_to_add_, std::unique_ptr && stream_) - : block_to_add(std::move(block_to_add_)) - , stream(std::move(stream_)) +static Block transformHeader(Block header, Block block_to_add) { -} - -Block BlockInputStreamWithAdditionalColumns::getHeader() const -{ - auto header = stream->getHeader(); - - if (header) - { - for (Int64 i = static_cast(block_to_add.columns() - 1); i >= 0; --i) - header.insert(0, block_to_add.getByPosition(i).cloneEmpty()); - } + for (Int64 i = static_cast(block_to_add.columns() - 1); i >= 0; --i) + header.insert(0, block_to_add.getByPosition(i).cloneEmpty()); return header; } -Block BlockInputStreamWithAdditionalColumns::readImpl() +TransformWithAdditionalColumns::TransformWithAdditionalColumns( + Block block_to_add_, const Block & header) + : ISimpleTransform(header, transformHeader(header, block_to_add_), true) + , block_to_add(std::move(block_to_add_)) { - auto block = stream->read(); +} - if (block) +void TransformWithAdditionalColumns::transform(Chunk & chunk) +{ + if (chunk) { - auto block_rows = block.rows(); + auto num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); - auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, block_rows); + auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, num_rows); - if (cut_block.rows() != block_rows) + if (cut_block.rows() != num_rows) throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Number of rows in block to add after cut must equal to number of rows in block from inner stream"); for (Int64 i = static_cast(cut_block.columns() - 1); i >= 0; --i) - block.insert(0, cut_block.getByPosition(i)); + columns.insert(columns.begin(), cut_block.getByPosition(i).column); - current_range_index += block_rows; + current_range_index += num_rows; + chunk.setColumns(std::move(columns), num_rows); } - - return block; } -void BlockInputStreamWithAdditionalColumns::readPrefix() +String TransformWithAdditionalColumns::getName() const { - stream->readPrefix(); -} - -void BlockInputStreamWithAdditionalColumns::readSuffix() -{ - stream->readSuffix(); -} - -String BlockInputStreamWithAdditionalColumns::getName() const -{ - return "BlockInputStreamWithAdditionalColumns"; -} + return "TransformWithAdditionalColumns"; } +} \ No newline at end of file diff --git a/src/Dictionaries/DictionarySourceHelpers.h b/src/Dictionaries/DictionarySourceHelpers.h index 6fed4c7181c..ba05cf9ebc9 100644 --- a/src/Dictionaries/DictionarySourceHelpers.h +++ b/src/Dictionaries/DictionarySourceHelpers.h @@ -4,7 +4,7 @@ #include #include -#include +#include #include #include #include @@ -38,25 +38,18 @@ ContextMutablePtr copyContextAndApplySettings( * * block_to_add rows size must be equal to final sum rows size of all inner stream blocks. */ -class BlockInputStreamWithAdditionalColumns final : public IBlockInputStream +class TransformWithAdditionalColumns final : public ISimpleTransform { public: - BlockInputStreamWithAdditionalColumns(Block block_to_add_, std::unique_ptr && stream_); + TransformWithAdditionalColumns(Block block_to_add_, const Block & header); - Block getHeader() const override; - - Block readImpl() override; - - void readPrefix() override; - - void readSuffix() override; + void transform(Chunk & chunk) override; String getName() const override; private: Block block_to_add; - std::unique_ptr stream; size_t current_range_index = 0; }; -} +} \ No newline at end of file diff --git a/src/Dictionaries/DirectDictionary.cpp b/src/Dictionaries/DirectDictionary.cpp index c9b38acfbb5..afdd9bee92c 100644 --- a/src/Dictionaries/DirectDictionary.cpp +++ b/src/Dictionaries/DirectDictionary.cpp @@ -8,6 +8,8 @@ #include #include +#include +#include namespace DB { @@ -66,11 +68,13 @@ Columns DirectDictionary::getColumns( size_t dictionary_keys_size = dict_struct.getKeysNames().size(); block_key_columns.reserve(dictionary_keys_size); - BlockInputStreamPtr stream = getSourceBlockInputStream(key_columns, requested_keys); + QueryPipeline pipeline; + pipeline.init(getSourceBlockInputStream(key_columns, requested_keys)); - stream->readPrefix(); + PullingPipelineExecutor executor(pipeline); - while (const auto block = stream->read()) + Block block; + while (executor.pull(block)) { /// Split into keys columns and attribute columns for (size_t i = 0; i < dictionary_keys_size; ++i) @@ -98,8 +102,6 @@ Columns DirectDictionary::getColumns( block_key_columns.clear(); } - stream->readSuffix(); - Field value_to_insert; size_t requested_keys_size = requested_keys.size(); @@ -183,13 +185,14 @@ ColumnUInt8::Ptr DirectDictionary::hasKeys( size_t dictionary_keys_size = dict_struct.getKeysNames().size(); block_key_columns.reserve(dictionary_keys_size); - BlockInputStreamPtr stream = getSourceBlockInputStream(key_columns, requested_keys); + QueryPipeline pipeline; + pipeline.init(getSourceBlockInputStream(key_columns, requested_keys)); - stream->readPrefix(); + PullingPipelineExecutor executor(pipeline); size_t keys_found = 0; - - while (const auto block = stream->read()) + Block block; + while (executor.pull(block)) { /// Split into keys columns and attribute columns for (size_t i = 0; i < dictionary_keys_size; ++i) @@ -216,8 +219,6 @@ ColumnUInt8::Ptr DirectDictionary::hasKeys( block_key_columns.clear(); } - stream->readSuffix(); - query_count.fetch_add(requested_keys_size, std::memory_order_relaxed); found_count.fetch_add(keys_found, std::memory_order_relaxed); @@ -260,13 +261,13 @@ ColumnUInt8::Ptr DirectDictionary::isInHierarchy( } template -BlockInputStreamPtr DirectDictionary::getSourceBlockInputStream( +Pipe DirectDictionary::getSourceBlockInputStream( const Columns & key_columns [[maybe_unused]], const PaddedPODArray & requested_keys [[maybe_unused]]) const { size_t requested_keys_size = requested_keys.size(); - BlockInputStreamPtr stream; + Pipe pipe; if constexpr (dictionary_key_type == DictionaryKeyType::simple) { @@ -276,7 +277,7 @@ BlockInputStreamPtr DirectDictionary::getSourceBlockInputSt for (auto key : requested_keys) ids.emplace_back(key); - stream = source_ptr->loadIds(ids); + pipe = source_ptr->loadIds(ids); } else { @@ -285,14 +286,14 @@ BlockInputStreamPtr DirectDictionary::getSourceBlockInputSt for (size_t i = 0; i < requested_keys_size; ++i) requested_rows.emplace_back(i); - stream = source_ptr->loadKeys(key_columns, requested_rows); + pipe = source_ptr->loadKeys(key_columns, requested_rows); } - return stream; + return pipe; } template -BlockInputStreamPtr DirectDictionary::getBlockInputStream(const Names & /* column_names */, size_t /* max_block_size */) const +Pipe DirectDictionary::read(const Names & /* column_names */, size_t /* max_block_size */) const { return source_ptr->loadAll(); } @@ -353,4 +354,4 @@ void registerDictionaryDirect(DictionaryFactory & factory) } -} +} \ No newline at end of file diff --git a/src/Dictionaries/DirectDictionary.h b/src/Dictionaries/DirectDictionary.h index a4e8d5f82e6..841590164f6 100644 --- a/src/Dictionaries/DirectDictionary.h +++ b/src/Dictionaries/DirectDictionary.h @@ -97,10 +97,10 @@ public: ColumnPtr in_key_column, const DataTypePtr & key_type) const override; - BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; + Pipe read(const Names & column_names, size_t max_block_size) const override; private: - BlockInputStreamPtr getSourceBlockInputStream(const Columns & key_columns, const PaddedPODArray & requested_keys) const; + Pipe getSourceBlockInputStream(const Columns & key_columns, const PaddedPODArray & requested_keys) const; const DictionaryStructure dict_struct; const DictionarySourcePtr source_ptr; @@ -113,4 +113,4 @@ private: extern template class DirectDictionary; extern template class DirectDictionary; -} +} \ No newline at end of file diff --git a/src/Dictionaries/ExecutableDictionarySource.cpp b/src/Dictionaries/ExecutableDictionarySource.cpp index daf79965428..82b74d8196e 100644 --- a/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/src/Dictionaries/ExecutableDictionarySource.cpp @@ -2,13 +2,17 @@ #include #include -#include #include #include +#include +#include +#include +#include +#include #include +#include #include #include -#include #include #include #include @@ -34,26 +38,34 @@ namespace ErrorCodes namespace { /// Owns ShellCommand and calls wait for it. - class ShellCommandOwningBlockInputStream : public OwningBlockInputStream + class ShellCommandOwningTransform final : public ISimpleTransform { private: Poco::Logger * log; + std::unique_ptr command; public: - ShellCommandOwningBlockInputStream(Poco::Logger * log_, const BlockInputStreamPtr & impl, std::unique_ptr command_) - : OwningBlockInputStream(std::move(impl), std::move(command_)), log(log_) + ShellCommandOwningTransform(const Block & header, Poco::Logger * log_, std::unique_ptr command_) + : ISimpleTransform(header, header, true), log(log_), command(std::move(command_)) { } - void readSuffix() override + String getName() const override { return "ShellCommandOwningTransform"; } + void transform(Chunk &) override {} + + Status prepare() override { - OwningBlockInputStream::readSuffix(); + auto status = ISimpleTransform::prepare(); + if (status == Status::Finished) + { + std::string err; + readStringUntilEOF(err, command->err); + if (!err.empty()) + LOG_ERROR(log, "Having stderr: {}", err); - std::string err; - readStringUntilEOF(err, own->err); - if (!err.empty()) - LOG_ERROR(log, "Having stderr: {}", err); + command->wait(); + } - own->wait(); + return status; } }; @@ -94,18 +106,19 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar { } -BlockInputStreamPtr ExecutableDictionarySource::loadAll() +Pipe ExecutableDictionarySource::loadAll() { if (configuration.implicit_key) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadAll method"); LOG_TRACE(log, "loadAll {}", toString()); auto process = ShellCommand::execute(configuration.command); - auto input_stream = context->getInputFormat(configuration.format, process->out, sample_block, max_block_size); - return std::make_shared(log, input_stream, std::move(process)); + Pipe pipe(FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, max_block_size)); + pipe.addTransform(std::make_shared(pipe.getHeader(), log, std::move(process))); + return pipe; } -BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll() +Pipe ExecutableDictionarySource::loadUpdatedAll() { if (configuration.implicit_key) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method"); @@ -119,81 +132,86 @@ BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll() LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field); auto process = ShellCommand::execute(command_with_update_field); - auto input_stream = context->getInputFormat(configuration.format, process->out, sample_block, max_block_size); - return std::make_shared(log, input_stream, std::move(process)); + + Pipe pipe(FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, max_block_size)); + pipe.addTransform(std::make_shared(pipe.getHeader(), log, std::move(process))); + return pipe; } namespace { /** A stream, that runs child process and sends data to its stdin in background thread, * and receives data from its stdout. + * + * TODO: implement without background thread. */ - class BlockInputStreamWithBackgroundThread final : public IBlockInputStream + class SourceWithBackgroundThread final : public SourceWithProgress { public: - BlockInputStreamWithBackgroundThread( + SourceWithBackgroundThread( ContextPtr context, const std::string & format, const Block & sample_block, const std::string & command_str, Poco::Logger * log_, std::function && send_data_) - : log(log_), - command(ShellCommand::execute(command_str)), - send_data(std::move(send_data_)), - thread([this] { send_data(command->in); }) + : SourceWithProgress(sample_block) + , log(log_) + , command(ShellCommand::execute(command_str)) + , send_data(std::move(send_data_)) + , thread([this] { send_data(command->in); }) { - stream = context->getInputFormat(format, command->out, sample_block, max_block_size); + pipeline.init(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, max_block_size))); + executor = std::make_unique(pipeline); } - ~BlockInputStreamWithBackgroundThread() override + ~SourceWithBackgroundThread() override { if (thread.joinable()) thread.join(); } - Block getHeader() const override + protected: + Chunk generate() override { - return stream->getHeader(); + Chunk chunk; + executor->pull(chunk); + return chunk; } - private: - Block readImpl() override + public: + Status prepare() override { - return stream->read(); + auto status = SourceWithProgress::prepare(); + + if (status == Status::Finished) + { + std::string err; + readStringUntilEOF(err, command->err); + if (!err.empty()) + LOG_ERROR(log, "Having stderr: {}", err); + + if (thread.joinable()) + thread.join(); + + command->wait(); + } + + return status; } - void readPrefix() override - { - stream->readPrefix(); - } - - void readSuffix() override - { - stream->readSuffix(); - - std::string err; - readStringUntilEOF(err, command->err); - if (!err.empty()) - LOG_ERROR(log, "Having stderr: {}", err); - - if (thread.joinable()) - thread.join(); - - command->wait(); - } - - String getName() const override { return "WithBackgroundThread"; } + String getName() const override { return "SourceWithBackgroundThread"; } Poco::Logger * log; - BlockInputStreamPtr stream; + QueryPipeline pipeline; + std::unique_ptr executor; std::unique_ptr command; std::function send_data; ThreadFromGlobalPool thread; }; } -BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector & ids) +Pipe ExecutableDictionarySource::loadIds(const std::vector & ids) { LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); @@ -201,7 +219,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector & requested_rows) +Pipe ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows) { LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size()); @@ -209,21 +227,21 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_col return getStreamForBlock(block); } -BlockInputStreamPtr ExecutableDictionarySource::getStreamForBlock(const Block & block) +Pipe ExecutableDictionarySource::getStreamForBlock(const Block & block) { - auto stream = std::make_unique( + Pipe pipe(std::make_unique( context, configuration.format, sample_block, configuration.command, log, [block, this](WriteBufferFromFile & out) mutable { auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty()); formatBlock(output_stream, block); out.close(); - }); + })); if (configuration.implicit_key) - return std::make_shared(block, std::move(stream)); - else - return std::shared_ptr(stream.release()); + pipe.addTransform(std::make_shared(block, pipe.getHeader())); + + return pipe; } bool ExecutableDictionarySource::isModified() const @@ -289,4 +307,4 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory) factory.registerSource("executable", create_table_source); } -} +} \ No newline at end of file diff --git a/src/Dictionaries/ExecutableDictionarySource.h b/src/Dictionaries/ExecutableDictionarySource.h index 0b92023df36..084b8c13c25 100644 --- a/src/Dictionaries/ExecutableDictionarySource.h +++ b/src/Dictionaries/ExecutableDictionarySource.h @@ -36,17 +36,17 @@ public: ExecutableDictionarySource(const ExecutableDictionarySource & other); ExecutableDictionarySource & operator=(const ExecutableDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + Pipe loadAll() override; /** The logic of this method is flawed, absolutely incorrect and ignorant. * It may lead to skipping some values due to clock sync or timezone changes. * The intended usage of "update_field" is totally different. */ - BlockInputStreamPtr loadUpdatedAll() override; + Pipe loadUpdatedAll() override; - BlockInputStreamPtr loadIds(const std::vector & ids) override; + Pipe loadIds(const std::vector & ids) override; - BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; + Pipe loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; bool isModified() const override; @@ -58,7 +58,7 @@ public: std::string toString() const override; - BlockInputStreamPtr getStreamForBlock(const Block & block); + Pipe getStreamForBlock(const Block & block); private: Poco::Logger * log; @@ -69,4 +69,4 @@ private: ContextPtr context; }; -} +} \ No newline at end of file diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.cpp b/src/Dictionaries/ExecutablePoolDictionarySource.cpp index 9eacda343cf..2cd567bbc02 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.cpp +++ b/src/Dictionaries/ExecutablePoolDictionarySource.cpp @@ -2,12 +2,15 @@ #include #include -#include +#include +#include +#include #include #include #include #include -#include +#include +#include #include #include #include @@ -69,12 +72,12 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutableP { } -BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll() +Pipe ExecutablePoolDictionarySource::loadAll() { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadAll method"); } -BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll() +Pipe ExecutablePoolDictionarySource::loadUpdatedAll() { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadUpdatedAll method"); } @@ -84,19 +87,19 @@ namespace /** A stream, that runs child process and sends data to its stdin in background thread, * and receives data from its stdout. */ - class PoolBlockInputStreamWithBackgroundThread final : public IBlockInputStream + class PoolSourceWithBackgroundThread final : public SourceWithProgress { public: - PoolBlockInputStreamWithBackgroundThread( + PoolSourceWithBackgroundThread( std::shared_ptr process_pool_, std::unique_ptr && command_, - BlockInputStreamPtr && stream_, + Pipe pipe, size_t read_rows_, Poco::Logger * log_, std::function && send_data_) - : process_pool(process_pool_) + : SourceWithProgress(pipe.getHeader()) + , process_pool(process_pool_) , command(std::move(command_)) - , stream(std::move(stream_)) , rows_to_read(read_rows_) , log(log_) , send_data(std::move(send_data_)) @@ -112,9 +115,12 @@ namespace exception_during_read = std::current_exception(); } }) - {} + { + pipeline.init(std::move(pipe)); + executor = std::make_unique(pipeline); + } - ~PoolBlockInputStreamWithBackgroundThread() override + ~PoolSourceWithBackgroundThread() override { if (thread.joinable()) thread.join(); @@ -123,25 +129,22 @@ namespace process_pool->returnObject(std::move(command)); } - Block getHeader() const override - { - return stream->getHeader(); - } - - private: - Block readImpl() override + protected: + Chunk generate() override { rethrowExceptionDuringReadIfNeeded(); if (current_read_rows == rows_to_read) - return Block(); + return {}; - Block block; + Chunk chunk; try { - block = stream->read(); - current_read_rows += block.rows(); + if (!executor->pull(chunk)) + return {}; + + current_read_rows += chunk.getNumRows(); } catch (...) { @@ -150,22 +153,23 @@ namespace throw; } - return block; + return chunk; } - void readPrefix() override + public: + Status prepare() override { - rethrowExceptionDuringReadIfNeeded(); - stream->readPrefix(); - } + auto status = SourceWithProgress::prepare(); - void readSuffix() override - { - if (thread.joinable()) - thread.join(); + if (status == Status::Finished) + { + if (thread.joinable()) + thread.join(); - rethrowExceptionDuringReadIfNeeded(); - stream->readSuffix(); + rethrowExceptionDuringReadIfNeeded(); + } + + return status; } void rethrowExceptionDuringReadIfNeeded() @@ -182,7 +186,8 @@ namespace std::shared_ptr process_pool; std::unique_ptr command; - BlockInputStreamPtr stream; + QueryPipeline pipeline; + std::unique_ptr executor; size_t rows_to_read; Poco::Logger * log; std::function send_data; @@ -194,7 +199,7 @@ namespace } -BlockInputStreamPtr ExecutablePoolDictionarySource::loadIds(const std::vector & ids) +Pipe ExecutablePoolDictionarySource::loadIds(const std::vector & ids) { LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); @@ -202,7 +207,7 @@ BlockInputStreamPtr ExecutablePoolDictionarySource::loadIds(const std::vector & requested_rows) +Pipe ExecutablePoolDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows) { LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size()); @@ -210,7 +215,7 @@ BlockInputStreamPtr ExecutablePoolDictionarySource::loadKeys(const Columns & key return getStreamForBlock(block); } -BlockInputStreamPtr ExecutablePoolDictionarySource::getStreamForBlock(const Block & block) +Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block) { std::unique_ptr process; bool result = process_pool->tryBorrowObject(process, [this]() @@ -227,20 +232,20 @@ BlockInputStreamPtr ExecutablePoolDictionarySource::getStreamForBlock(const Bloc configuration.max_command_execution_time); size_t rows_to_read = block.rows(); - auto read_stream = context->getInputFormat(configuration.format, process->out, sample_block, rows_to_read); + auto format = FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, rows_to_read); - auto stream = std::make_unique( - process_pool, std::move(process), std::move(read_stream), rows_to_read, log, + Pipe pipe(std::make_unique( + process_pool, std::move(process), Pipe(std::move(format)), rows_to_read, log, [block, this](WriteBufferFromFile & out) mutable { auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty()); formatBlock(output_stream, block); - }); + })); if (configuration.implicit_key) - return std::make_shared(block, std::move(stream)); - else - return std::shared_ptr(stream.release()); + pipe.addTransform(std::make_shared(block, pipe.getHeader())); + + return pipe; } bool ExecutablePoolDictionarySource::isModified() const @@ -320,4 +325,4 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory) factory.registerSource("executable_pool", create_table_source); } -} +} \ No newline at end of file diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.h b/src/Dictionaries/ExecutablePoolDictionarySource.h index 02b0288a52e..9c8730632f7 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.h +++ b/src/Dictionaries/ExecutablePoolDictionarySource.h @@ -48,17 +48,17 @@ public: ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other); ExecutablePoolDictionarySource & operator=(const ExecutablePoolDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + Pipe loadAll() override; /** The logic of this method is flawed, absolutely incorrect and ignorant. * It may lead to skipping some values due to clock sync or timezone changes. * The intended usage of "update_field" is totally different. */ - BlockInputStreamPtr loadUpdatedAll() override; + Pipe loadUpdatedAll() override; - BlockInputStreamPtr loadIds(const std::vector & ids) override; + Pipe loadIds(const std::vector & ids) override; - BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; + Pipe loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; bool isModified() const override; @@ -70,7 +70,7 @@ public: std::string toString() const override; - BlockInputStreamPtr getStreamForBlock(const Block & block); + Pipe getStreamForBlock(const Block & block); private: Poco::Logger * log; @@ -83,4 +83,4 @@ private: std::shared_ptr process_pool; }; -} +} \ No newline at end of file diff --git a/src/Dictionaries/FileDictionarySource.cpp b/src/Dictionaries/FileDictionarySource.cpp index 239c13e71c2..3766da0a28d 100644 --- a/src/Dictionaries/FileDictionarySource.cpp +++ b/src/Dictionaries/FileDictionarySource.cpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "DictionarySourceFactory.h" #include "DictionaryStructure.h" #include "registerDictionaries.h" @@ -45,14 +47,15 @@ FileDictionarySource::FileDictionarySource(const FileDictionarySource & other) } -BlockInputStreamPtr FileDictionarySource::loadAll() +Pipe FileDictionarySource::loadAll() { LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll {}", toString()); auto in_ptr = std::make_unique(filepath); - auto stream = context->getInputFormat(format, *in_ptr, sample_block, max_block_size); + auto source = FormatFactory::instance().getInput(format, *in_ptr, sample_block, context, max_block_size); + source->addBuffer(std::move(in_ptr)); last_modification = getLastModification(); - return std::make_shared>(stream, std::move(in_ptr)); + return Pipe(std::move(source)); } @@ -92,4 +95,4 @@ void registerDictionarySourceFile(DictionarySourceFactory & factory) factory.registerSource("file", create_table_source); } -} +} \ No newline at end of file diff --git a/src/Dictionaries/FileDictionarySource.h b/src/Dictionaries/FileDictionarySource.h index ffc29374f4f..efc86be8ba9 100644 --- a/src/Dictionaries/FileDictionarySource.h +++ b/src/Dictionaries/FileDictionarySource.h @@ -21,19 +21,19 @@ public: FileDictionarySource(const FileDictionarySource & other); - BlockInputStreamPtr loadAll() override; + Pipe loadAll() override; - BlockInputStreamPtr loadUpdatedAll() override + Pipe loadUpdatedAll() override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for FileDictionarySource"); } - BlockInputStreamPtr loadIds(const std::vector & /*ids*/) override + Pipe loadIds(const std::vector & /*ids*/) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadIds is unsupported for FileDictionarySource"); } - BlockInputStreamPtr loadKeys(const Columns & /*key_columns*/, const std::vector & /*requested_rows*/) override + Pipe loadKeys(const Columns & /*key_columns*/, const std::vector & /*requested_rows*/) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadKeys is unsupported for FileDictionarySource"); } @@ -65,4 +65,4 @@ private: Poco::Timestamp last_modification; }; -} +} \ No newline at end of file diff --git a/src/Dictionaries/FlatDictionary.cpp b/src/Dictionaries/FlatDictionary.cpp index abef2335ffd..40afe70f504 100644 --- a/src/Dictionaries/FlatDictionary.cpp +++ b/src/Dictionaries/FlatDictionary.cpp @@ -10,6 +10,9 @@ #include #include +#include +#include + #include #include #include @@ -319,10 +322,12 @@ void FlatDictionary::updateData() { if (!update_field_loaded_block || update_field_loaded_block->rows() == 0) { - auto stream = source_ptr->loadUpdatedAll(); - stream->readPrefix(); + QueryPipeline pipeline; + pipeline.init(source_ptr->loadUpdatedAll()); - while (const auto block = stream->read()) + PullingPipelineExecutor executor(pipeline); + Block block; + while (executor.pull(block)) { /// We are using this to keep saved data if input stream consists of multiple blocks if (!update_field_loaded_block) @@ -335,15 +340,14 @@ void FlatDictionary::updateData() saved_column->insertRangeFrom(update_column, 0, update_column.size()); } } - stream->readSuffix(); } else { - auto stream = source_ptr->loadUpdatedAll(); - mergeBlockWithStream( + Pipe pipe(source_ptr->loadUpdatedAll()); + mergeBlockWithPipe( dict_struct.getKeysSize(), *update_field_loaded_block, - stream); + std::move(pipe)); } if (update_field_loaded_block) @@ -354,13 +358,13 @@ void FlatDictionary::loadData() { if (!source_ptr->hasUpdateField()) { - auto stream = source_ptr->loadAll(); - stream->readPrefix(); + QueryPipeline pipeline; + pipeline.init(source_ptr->loadAll()); + PullingPipelineExecutor executor(pipeline); - while (const auto block = stream->read()) + Block block; + while (executor.pull(block)) blockToAttributes(block); - - stream->readSuffix(); } else updateData(); @@ -531,7 +535,7 @@ void FlatDictionary::setAttributeValue(Attribute & attribute, const UInt64 key, callOnDictionaryAttributeType(attribute.type, type_call); } -BlockInputStreamPtr FlatDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const +Pipe FlatDictionary::read(const Names & column_names, size_t max_block_size) const { const auto keys_count = loaded_keys.size(); @@ -542,7 +546,8 @@ BlockInputStreamPtr FlatDictionary::getBlockInputStream(const Names & column_nam if (loaded_keys[key_index]) keys.push_back(key_index); - return std::make_shared(shared_from_this(), max_block_size, std::move(keys), column_names); + return Pipe(std::make_shared( + DictionarySourceData(shared_from_this(), std::move(keys), column_names), max_block_size)); } void registerDictionaryFlat(DictionaryFactory & factory) @@ -586,4 +591,4 @@ void registerDictionaryFlat(DictionaryFactory & factory) } -} +} \ No newline at end of file diff --git a/src/Dictionaries/FlatDictionary.h b/src/Dictionaries/FlatDictionary.h index ccd3bf9d9eb..c16547bd3b4 100644 --- a/src/Dictionaries/FlatDictionary.h +++ b/src/Dictionaries/FlatDictionary.h @@ -97,7 +97,7 @@ public: const DataTypePtr & key_type, size_t level) const override; - BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; + Pipe read(const Names & column_names, size_t max_block_size) const override; private: template @@ -178,4 +178,4 @@ private: BlockPtr update_field_loaded_block; }; -} +} \ No newline at end of file diff --git a/src/Dictionaries/HashedDictionary.cpp b/src/Dictionaries/HashedDictionary.cpp index d65338b9a4b..33e06de23bf 100644 --- a/src/Dictionaries/HashedDictionary.cpp +++ b/src/Dictionaries/HashedDictionary.cpp @@ -637,7 +637,7 @@ void HashedDictionary::calculateBytesAllocated() } template -BlockInputStreamPtr HashedDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const +Pipe HashedDictionary::read(const Names & column_names, size_t max_block_size) const { PaddedPODArray keys; @@ -667,9 +667,9 @@ BlockInputStreamPtr HashedDictionary::getBlockInput } if constexpr (dictionary_key_type == DictionaryKeyType::simple) - return std::make_shared(shared_from_this(), max_block_size, std::move(keys), column_names); + return Pipe(std::make_shared(DictionarySourceData(shared_from_this(), std::move(keys), column_names), max_block_size)); else - return std::make_shared(shared_from_this(), max_block_size, keys, column_names); + return Pipe(std::make_shared(DictionarySourceData(shared_from_this(), keys, column_names), max_block_size)); } template @@ -767,4 +767,4 @@ void registerDictionaryHashed(DictionaryFactory & factory) } -} +} \ No newline at end of file diff --git a/src/Dictionaries/HashedDictionary.h b/src/Dictionaries/HashedDictionary.h index 842e49aa8f0..82e8a91b603 100644 --- a/src/Dictionaries/HashedDictionary.h +++ b/src/Dictionaries/HashedDictionary.h @@ -116,7 +116,7 @@ public: const DataTypePtr & key_type, size_t level) const override; - BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; + Pipe read(const Names & column_names, size_t max_block_size) const override; private: template @@ -225,4 +225,4 @@ extern template class HashedDictionary; extern template class HashedDictionary; extern template class HashedDictionary; -} +} \ No newline at end of file diff --git a/src/Dictionaries/IDictionary.h b/src/Dictionaries/IDictionary.h index 5467a673503..f9e0223a698 100644 --- a/src/Dictionaries/IDictionary.h +++ b/src/Dictionaries/IDictionary.h @@ -191,7 +191,7 @@ struct IDictionary : public IExternalLoadable getDictionaryID().getNameForLogs()); } - virtual BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const = 0; + virtual Pipe read(const Names & column_names, size_t max_block_size) const = 0; bool supportUpdates() const override { return true; } diff --git a/src/Dictionaries/IDictionarySource.h b/src/Dictionaries/IDictionarySource.h index 42b35c95062..661f5b8eeb8 100644 --- a/src/Dictionaries/IDictionarySource.h +++ b/src/Dictionaries/IDictionarySource.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include @@ -21,28 +21,30 @@ class IDictionarySource { public: - /// Returns an input stream with all the data available from this source. - virtual BlockInputStreamPtr loadAll() = 0; + /// Returns a pipe with all the data available from this source. + virtual Pipe loadAll() = 0; - /// Returns an input stream with updated data available from this source. - virtual BlockInputStreamPtr loadUpdatedAll() = 0; + /// Returns a pipe with updated data available from this source. + virtual Pipe loadUpdatedAll() = 0; /** * result_size_hint - approx number of rows in the stream. - * Returns an input stream with all the data available from this source. + * Returns a pipe with all the data available from this source. * * NOTE: result_size_hint may be changed during you are reading (usually it * will be non zero for the first block and zero for others, since it uses - * Progress::total_rows_approx,) from the input stream, and may be called + * Progress::total_rows_approx,) from the pipe, and may be called * in parallel, so you should use something like this: * * ... * std::atomic new_size = 0; * - * auto stream = source->loadAll(&new_size); - * stream->readPrefix(); + * QueryPipeline pipeline; + * pipeline.init(source->loadAll(&new_size)); + * PullingPipelineExecutor executor; * - * while (const auto block = stream->read()) + * Block block; + * while (executor.pull(block)) * { * if (new_size) * { @@ -56,10 +58,9 @@ public: * } * } * - * stream->readSuffix(); * ... */ - virtual BlockInputStreamPtr loadAllWithSizeHint(std::atomic * /* result_size_hint */) + virtual Pipe loadAllWithSizeHint(std::atomic * /* result_size_hint */) { return loadAll(); } @@ -72,13 +73,13 @@ public: /** Returns an input stream with the data for a collection of identifiers. * It must be guaranteed, that 'ids' array will live at least until all data will be read from returned stream. */ - virtual BlockInputStreamPtr loadIds(const std::vector & ids) = 0; + virtual Pipe loadIds(const std::vector & ids) = 0; /** Returns an input stream with the data for a collection of composite keys. * `requested_rows` contains indices of all rows containing unique keys. * It must be guaranteed, that 'requested_rows' array will live at least until all data will be read from returned stream. */ - virtual BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) = 0; + virtual Pipe loadKeys(const Columns & key_columns, const std::vector & requested_rows) = 0; /// indicates whether the source has been modified since last load* operation virtual bool isModified() const = 0; diff --git a/src/Dictionaries/readInvalidateQuery.cpp b/src/Dictionaries/readInvalidateQuery.cpp index 4664b61bfc4..bd1ec0e0983 100644 --- a/src/Dictionaries/readInvalidateQuery.cpp +++ b/src/Dictionaries/readInvalidateQuery.cpp @@ -1,5 +1,6 @@ #include "readInvalidateQuery.h" -#include +#include +#include #include #include @@ -14,11 +15,18 @@ namespace ErrorCodes extern const int RECEIVED_EMPTY_DATA; } -std::string readInvalidateQuery(IBlockInputStream & block_input_stream) +std::string readInvalidateQuery(Pipe pipe) { - block_input_stream.readPrefix(); + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); + + PullingPipelineExecutor executor(pipeline); + + Block block; + while (executor.pull(block)) + if (block) + break; - Block block = block_input_stream.read(); if (!block) throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Empty response"); @@ -36,11 +44,10 @@ std::string readInvalidateQuery(IBlockInputStream & block_input_stream) auto & column_type = block.getByPosition(0); column_type.type->getDefaultSerialization()->serializeTextQuoted(*column_type.column->convertToFullColumnIfConst(), 0, out, FormatSettings()); - while ((block = block_input_stream.read())) + while (executor.pull(block)) if (block.rows() > 0) throw Exception(ErrorCodes::TOO_MANY_ROWS, "Expected single row in resultset, got at least {}", std::to_string(rows + 1)); - block_input_stream.readSuffix(); return out.str(); } diff --git a/src/Dictionaries/readInvalidateQuery.h b/src/Dictionaries/readInvalidateQuery.h index 0a259e1ec24..61d5b29dc89 100644 --- a/src/Dictionaries/readInvalidateQuery.h +++ b/src/Dictionaries/readInvalidateQuery.h @@ -1,13 +1,13 @@ #pragma once -#include - #include namespace DB { +class Pipe; + /// Using in MySQLDictionarySource and XDBCDictionarySource after processing invalidate_query. -std::string readInvalidateQuery(IBlockInputStream & block_input_stream); +std::string readInvalidateQuery(Pipe pipe); }