Processors support for StorageJoin reading.

This commit is contained in:
Nikolai Kochetov 2020-01-31 17:51:09 +03:00
parent f9db37ebf7
commit 384e68d745
3 changed files with 33 additions and 23 deletions

View File

@ -322,7 +322,7 @@ public:
private: private:
friend class NonJoinedBlockInputStream; friend class NonJoinedBlockInputStream;
friend class JoinBlockInputStream; friend class JoinSource;
std::shared_ptr<AnalyzedJoin> table_join; std::shared_ptr<AnalyzedJoin> table_join;
ASTTableJoin::Kind kind; ASTTableJoin::Kind kind;

View File

@ -14,6 +14,8 @@
#include <Poco/String.h> /// toLower #include <Poco/String.h> /// toLower
#include <Poco/File.h> #include <Poco/File.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB namespace DB
@ -235,11 +237,15 @@ size_t rawSize(const StringRef & t)
return t.size; return t.size;
} }
class JoinBlockInputStream : public IBlockInputStream class JoinSource : public SourceWithProgress
{ {
public: public:
JoinBlockInputStream(const Join & parent_, UInt64 max_block_size_, Block && sample_block_) JoinSource(const Join & parent_, UInt64 max_block_size_, Block sample_block_)
: parent(parent_), lock(parent.data->rwlock), max_block_size(max_block_size_), sample_block(std::move(sample_block_)) : SourceWithProgress(sample_block_)
, parent(parent_)
, lock(parent.data->rwlock)
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
{ {
columns.resize(sample_block.columns()); columns.resize(sample_block.columns());
column_indices.resize(sample_block.columns()); column_indices.resize(sample_block.columns());
@ -263,20 +269,17 @@ public:
String getName() const override { return "Join"; } String getName() const override { return "Join"; }
Block getHeader() const override { return sample_block; }
protected: protected:
Block readImpl() override Chunk generate() override
{ {
if (parent.data->blocks.empty()) if (parent.data->blocks.empty())
return Block(); return {};
Block block; Chunk chunk;
if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps, if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps,
[&](auto kind, auto strictness, auto & map) { block = createBlock<kind, strictness>(map); })) [&](auto kind, auto strictness, auto & map) { chunk = createChunk<kind, strictness>(map); }))
throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR);
return block; return chunk;
} }
private: private:
@ -294,7 +297,7 @@ private:
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
Block createBlock(const Maps & maps) Chunk createChunk(const Maps & maps)
{ {
for (size_t i = 0; i < sample_block.columns(); ++i) for (size_t i = 0; i < sample_block.columns(); ++i)
{ {
@ -305,7 +308,7 @@ private:
if (key_pos == i) if (key_pos == i)
{ {
// unwrap null key column // unwrap null key column
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*columns[i]); auto & nullable_col = assert_cast<ColumnNullable &>(*columns[i]);
columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable(); columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable();
} }
else else
@ -333,22 +336,25 @@ private:
if (!rows_added) if (!rows_added)
return {}; return {};
Block res = sample_block.cloneEmpty(); Columns res_columns;
res_columns.reserve(columns.size());
for (size_t i = 0; i < columns.size(); ++i) for (size_t i = 0; i < columns.size(); ++i)
if (column_with_null[i]) if (column_with_null[i])
{ {
if (key_pos == i) if (key_pos == i)
res.getByPosition(i).column = makeNullable(std::move(columns[i])); res_columns.emplace_back(makeNullable(std::move(columns[i])));
else else
{ {
const ColumnNullable & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]); const auto & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]);
res.getByPosition(i).column = nullable_col.getNestedColumnPtr(); res_columns.emplace_back(makeNullable(nullable_col.getNestedColumnPtr()));
} }
} }
else else
res.getByPosition(i).column = std::move(columns[i]); res_columns.emplace_back(std::move(columns[i]));
return res; UInt64 num_rows = res_columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
} }
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map> template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map>
@ -438,7 +444,7 @@ private:
// TODO: multiple stream read and index read // TODO: multiple stream read and index read
BlockInputStreams StorageJoin::read( Pipes StorageJoin::readWithProcessors(
const Names & column_names, const Names & column_names,
const SelectQueryInfo & /*query_info*/, const SelectQueryInfo & /*query_info*/,
const Context & /*context*/, const Context & /*context*/,
@ -447,7 +453,11 @@ BlockInputStreams StorageJoin::read(
unsigned /*num_streams*/) unsigned /*num_streams*/)
{ {
check(column_names); check(column_names);
return {std::make_shared<JoinBlockInputStream>(*join, max_block_size, getSampleBlockForColumns(column_names))};
Pipes pipes;
pipes.emplace_back(std::make_shared<JoinSource>(*join, max_block_size, getSampleBlockForColumns(column_names)));
return pipes;
} }
} }

View File

@ -36,7 +36,7 @@ public:
/// Verify that the data structure is suitable for implementing this type of JOIN. /// Verify that the data structure is suitable for implementing this type of JOIN.
void assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const; void assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const;
BlockInputStreams read( Pipes readWithProcessors(
const Names & column_names, const Names & column_names,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
const Context & context, const Context & context,