added FilterColumnsBlockInputStream into StorageMerge [#CLICKHOUSE-3111]

This commit is contained in:
Nikolai Kochetov 2017-07-04 17:59:01 +03:00 committed by alexey-milovidov
parent f1ce8c437b
commit f1ec4cd863
2 changed files with 66 additions and 1 deletions

View File

@ -0,0 +1,58 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Columns/ColumnConst.h>
#include <iostream>
namespace DB
{
/// Removes columns other than columns_to_save_ from block
class FilterColumnsBlockInputStream : public IProfilingBlockInputStream
{
public:
FilterColumnsBlockInputStream(
BlockInputStreamPtr input_,
const Names & columns_to_save_)
: columns_to_save(columns_to_save_)
{
children.push_back(input_);
}
String getName() const override {
return "FilterColumnsBlockInputStream";
}
String getID() const override
{
std::stringstream res;
res << "FilterColumnsBlockInputStream(" << children.back()->getID();
for (const auto & it : columns_to_save)
res << ", " << it;
res << ")";
return res.str();
}
protected:
Block readImpl() override
{
Block block = children.back()->read();
if (!block)
return block;
Block filtered;
for (const auto & it : columns_to_save)
filtered.insert(std::move(block.getByName(it)));
return filtered;
}
private:
Names columns_to_save;
};
}

View File

@ -12,6 +12,7 @@
#include <Columns/ColumnString.h>
#include <Databases/IDatabase.h>
#include <DataStreams/CastTypeBlockInputStream.h>
#include <DataStreams/FilterColumnsBlockInputStream.h>
namespace DB
@ -225,7 +226,13 @@ BlockInputStreams StorageMerge::read(
if (processed_stage_in_source_tables)
processed_stage = processed_stage_in_source_tables.value();
return narrowBlockInputStreams(res, num_streams);
res = narrowBlockInputStreams(res, num_streams);
/// Added to avoid different block structure from different sources
for (auto & stream : res)
stream = std::make_shared<FilterColumnsBlockInputStream>(stream, column_names);
return res;
}
/// Construct a block consisting only of possible values of virtual columns