Processors for SystemColumns.

This commit is contained in:
Nikolai Kochetov 2020-01-24 19:13:46 +03:00
parent dc49635ed4
commit c3d0c26659
2 changed files with 30 additions and 23 deletions

View File

@ -10,6 +10,7 @@
#include <Parsers/queryToString.h>
#include <Parsers/ASTSelectQuery.h>
#include <Databases/IDatabase.h>
#include <Processors/Sources/NullSource.h>
namespace DB
@ -51,34 +52,33 @@ namespace
}
class ColumnsBlockInputStream : public IBlockInputStream
class ColumnsSource : public SourceWithProgress
{
public:
ColumnsBlockInputStream(
const std::vector<UInt8> & columns_mask_,
const Block & header_,
ColumnsSource(
std::vector<UInt8> columns_mask_,
Block header_,
UInt64 max_block_size_,
ColumnPtr databases_,
ColumnPtr tables_,
Storages storages_,
String query_id_)
: columns_mask(columns_mask_), header(header_), max_block_size(max_block_size_)
, databases(databases_), tables(tables_), storages(std::move(storages_))
: SourceWithProgress(header_)
, columns_mask(std::move(columns_mask_)), max_block_size(max_block_size_)
, databases(std::move(databases_)), tables(std::move(tables_)), storages(std::move(storages_))
, query_id(std::move(query_id_)), total_tables(tables->size())
{
}
String getName() const override { return "Columns"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override
Chunk generate() override
{
if (db_table_num >= total_tables)
return {};
Block res = header;
MutableColumns res_columns = header.cloneEmptyColumns();
MutableColumns res_columns = getPort().getHeader().cloneEmptyColumns();
size_t rows_count = 0;
while (rows_count < max_block_size && db_table_num < total_tables)
@ -210,13 +210,11 @@ protected:
}
}
res.setColumns(std::move(res_columns));
return res;
return Chunk(std::move(res_columns), rows_count);
}
private:
std::vector<UInt8> columns_mask;
Block header;
UInt64 max_block_size;
ColumnPtr databases;
ColumnPtr tables;
@ -227,7 +225,7 @@ private:
};
BlockInputStreams StorageSystemColumns::read(
Pipes StorageSystemColumns::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -242,7 +240,7 @@ BlockInputStreams StorageSystemColumns::read(
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = getSampleBlock();
Block res_block;
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
@ -250,12 +248,13 @@ BlockInputStreams StorageSystemColumns::read(
if (names_set.count(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
res_block.insert(sample_block.getByPosition(i));
header.insert(sample_block.getByPosition(i));
}
}
Block block_to_filter;
Storages storages;
Pipes pipes;
{
Databases databases = context.getDatabases();
@ -278,7 +277,10 @@ BlockInputStreams StorageSystemColumns::read(
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
if (!block_to_filter.rows())
return {std::make_shared<NullBlockInputStream>(res_block)};
{
pipes.emplace_back(std::make_shared<NullSource>(header));
return pipes;
}
ColumnPtr & database_column = block_to_filter.getByName("database").column;
size_t rows = database_column->size();
@ -311,15 +313,20 @@ BlockInputStreams StorageSystemColumns::read(
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
if (!block_to_filter.rows())
return {std::make_shared<NullBlockInputStream>(res_block)};
{
pipes.emplace_back(std::make_shared<NullSource>(header));
return pipes;
}
ColumnPtr filtered_database_column = block_to_filter.getByName("database").column;
ColumnPtr filtered_table_column = block_to_filter.getByName("table").column;
return {std::make_shared<ColumnsBlockInputStream>(
std::move(columns_mask), std::move(res_block), max_block_size,
std::move(filtered_database_column), std::move(filtered_table_column), std::move(storages),
context.getCurrentQueryId())};
pipes.emplace_back(std::make_shared<ColumnsSource>(
std::move(columns_mask), std::move(header), max_block_size,
std::move(filtered_database_column), std::move(filtered_table_column), std::move(storages),
context.getCurrentQueryId()));
return pipes;
}
}

View File

@ -17,7 +17,7 @@ class StorageSystemColumns : public ext::shared_ptr_helper<StorageSystemColumns>
public:
std::string getName() const override { return "SystemColumns"; }
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,