Processors for StorageSystemDisks and StorageSystemDetachedParts.

This commit is contained in:
Nikolai Kochetov 2020-01-24 20:49:14 +03:00
parent c3d0c26659
commit ffa3b3c665
3 changed files with 24 additions and 13 deletions

View File

@ -7,6 +7,7 @@
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/StorageSystemPartsBase.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
namespace DB
{
@ -42,7 +43,7 @@ protected:
}});
}
BlockInputStreams read(
Pipes readWithProcessors(
const Names & /* column_names */,
const SelectQueryInfo & query_info,
const Context & context,
@ -74,8 +75,12 @@ protected:
}
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(
block.cloneWithColumns(std::move(new_columns))));
UInt64 num_rows = new_columns.at(0)->size();
Chunk chunk(std::move(new_columns), num_rows);
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(std::move(block), std::move(chunk)));
return pipes;
}
};

View File

@ -1,5 +1,6 @@
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemDisks.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
namespace DB
{
@ -22,7 +23,7 @@ StorageSystemDisks::StorageSystemDisks(const std::string & name_)
}));
}
BlockInputStreams StorageSystemDisks::read(
Pipes StorageSystemDisks::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -49,15 +50,20 @@ BlockInputStreams StorageSystemDisks::read(
col_keep->insert(disk_ptr->getKeepingFreeSpace());
}
Block res = getSampleBlock().cloneEmpty();
size_t col_num = 0;
res.getByPosition(col_num++).column = std::move(col_name);
res.getByPosition(col_num++).column = std::move(col_path);
res.getByPosition(col_num++).column = std::move(col_free);
res.getByPosition(col_num++).column = std::move(col_total);
res.getByPosition(col_num++).column = std::move(col_keep);
Columns columns;
columns.emplace_back(std::move(col_name));
columns.emplace_back(std::move(col_path));
columns.emplace_back(std::move(col_free));
columns.emplace_back(std::move(col_total));
columns.emplace_back(std::move(col_keep));
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res));
UInt64 num_rows = columns.at(0)->size();
Chunk chunk(std::move(columns), num_rows);
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(getSampleBlock(), std::move(chunk)));
return pipes;
}
}

View File

@ -20,7 +20,7 @@ class StorageSystemDisks : public ext::shared_ptr_helper<StorageSystemDisks>, pu
public:
std::string getName() const override { return "SystemDisks"; }
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,