diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 337c18a5980..ca52c15d289 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -322,7 +322,7 @@ public: private: friend class NonJoinedBlockInputStream; - friend class JoinBlockInputStream; + friend class JoinSource; std::shared_ptr table_join; ASTTableJoin::Kind kind; diff --git a/dbms/src/Storages/StorageJoin.cpp b/dbms/src/Storages/StorageJoin.cpp index f18859ae90a..2f664416b93 100644 --- a/dbms/src/Storages/StorageJoin.cpp +++ b/dbms/src/Storages/StorageJoin.cpp @@ -14,6 +14,8 @@ #include /// toLower #include +#include +#include namespace DB @@ -235,11 +237,15 @@ size_t rawSize(const StringRef & t) return t.size; } -class JoinBlockInputStream : public IBlockInputStream +class JoinSource : public SourceWithProgress { public: - JoinBlockInputStream(const Join & parent_, UInt64 max_block_size_, Block && sample_block_) - : parent(parent_), lock(parent.data->rwlock), max_block_size(max_block_size_), sample_block(std::move(sample_block_)) + JoinSource(const Join & parent_, UInt64 max_block_size_, Block sample_block_) + : SourceWithProgress(sample_block_) + , parent(parent_) + , lock(parent.data->rwlock) + , max_block_size(max_block_size_) + , sample_block(std::move(sample_block_)) { columns.resize(sample_block.columns()); column_indices.resize(sample_block.columns()); @@ -263,20 +269,17 @@ public: String getName() const override { return "Join"; } - Block getHeader() const override { return sample_block; } - - protected: - Block readImpl() override + Chunk generate() override { if (parent.data->blocks.empty()) - return Block(); + return {}; - Block block; + Chunk chunk; if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps, - [&](auto kind, auto strictness, auto & map) { block = createBlock(map); })) + [&](auto kind, auto strictness, auto & map) { chunk = createChunk(map); })) throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR); - return block; + return chunk; } private: @@ -294,7 +297,7 @@ private: template - Block createBlock(const Maps & maps) + Chunk createChunk(const Maps & maps) { for (size_t i = 0; i < sample_block.columns(); ++i) { @@ -305,7 +308,7 @@ private: if (key_pos == i) { // unwrap null key column - ColumnNullable & nullable_col = assert_cast(*columns[i]); + auto & nullable_col = assert_cast(*columns[i]); columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable(); } else @@ -333,22 +336,25 @@ private: if (!rows_added) return {}; - Block res = sample_block.cloneEmpty(); + Columns res_columns; + res_columns.reserve(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) if (column_with_null[i]) { if (key_pos == i) - res.getByPosition(i).column = makeNullable(std::move(columns[i])); + res_columns.emplace_back(makeNullable(std::move(columns[i]))); else { - const ColumnNullable & nullable_col = assert_cast(*columns[i]); - res.getByPosition(i).column = nullable_col.getNestedColumnPtr(); + const auto & nullable_col = assert_cast(*columns[i]); + res_columns.emplace_back(makeNullable(nullable_col.getNestedColumnPtr())); } } else - res.getByPosition(i).column = std::move(columns[i]); + res_columns.emplace_back(std::move(columns[i])); - return res; + UInt64 num_rows = res_columns.at(0)->size(); + return Chunk(std::move(columns), num_rows); } template @@ -438,7 +444,7 @@ private: // TODO: multiple stream read and index read -BlockInputStreams StorageJoin::read( +Pipes StorageJoin::readWithProcessors( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & /*context*/, @@ -447,7 +453,11 @@ BlockInputStreams StorageJoin::read( unsigned /*num_streams*/) { check(column_names); - return {std::make_shared(*join, max_block_size, getSampleBlockForColumns(column_names))}; + + Pipes pipes; + pipes.emplace_back(std::make_shared(*join, max_block_size, getSampleBlockForColumns(column_names))); + + return pipes; } } diff --git a/dbms/src/Storages/StorageJoin.h b/dbms/src/Storages/StorageJoin.h index 22500a23baa..4815f53c7e6 100644 --- a/dbms/src/Storages/StorageJoin.h +++ b/dbms/src/Storages/StorageJoin.h @@ -36,7 +36,7 @@ public: /// Verify that the data structure is suitable for implementing this type of JOIN. void assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const; - BlockInputStreams read( + Pipes readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, const Context & context,