Processors support for StorageStripeLog reading.

This commit is contained in:
Nikolai Kochetov 2020-02-14 13:57:09 +03:00
parent 0766e46bcd
commit 96b5ef8459
2 changed files with 45 additions and 28 deletions

View File

@ -28,6 +28,9 @@
#include <Storages/StorageStripeLog.h>
#include <Storages/StorageFactory.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h>
namespace DB
@ -45,35 +48,46 @@ namespace ErrorCodes
}
class StripeLogBlockInputStream final : public IBlockInputStream
class StripeLogSource final : public SourceWithProgress
{
public:
StripeLogBlockInputStream(StorageStripeLog & storage_, size_t max_read_buffer_size_,
static Block getHeader(
StorageStripeLog & storage,
const Names & column_names,
IndexForNativeFormat::Blocks::const_iterator index_begin,
IndexForNativeFormat::Blocks::const_iterator index_end)
{
if (index_begin == index_end)
return storage.getSampleBlockForColumns(column_names);
/// TODO: check if possible to always return storage.getSampleBlock()
Block header;
for (const auto & column : index_begin->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert(ColumnWithTypeAndName{ type, column.name });
}
return header;
}
StripeLogSource(StorageStripeLog & storage_, const Names & column_names, size_t max_read_buffer_size_,
std::shared_ptr<const IndexForNativeFormat> & index_,
IndexForNativeFormat::Blocks::const_iterator index_begin_,
IndexForNativeFormat::Blocks::const_iterator index_end_)
: storage(storage_), max_read_buffer_size(max_read_buffer_size_),
index(index_), index_begin(index_begin_), index_end(index_end_)
: SourceWithProgress(getHeader(storage_, column_names, index_begin_, index_end_))
, storage(storage_), max_read_buffer_size(max_read_buffer_size_)
, index(index_), index_begin(index_begin_), index_end(index_end_)
{
if (index_begin != index_end)
{
for (const auto & column : index_begin->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert(ColumnWithTypeAndName{ type, column.name });
}
}
}
String getName() const override { return "StripeLog"; }
Block getHeader() const override
{
return header;
}
protected:
Block readImpl() override
Chunk generate() override
{
Block res;
start();
@ -91,7 +105,7 @@ protected:
}
}
return res;
return Chunk(res.getColumns(), res.rows());
}
private:
@ -235,7 +249,7 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Strin
}
BlockInputStreams StorageStripeLog::read(
Pipes StorageStripeLog::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -249,15 +263,18 @@ BlockInputStreams StorageStripeLog::read(
NameSet column_names_set(column_names.begin(), column_names.end());
Pipes pipes;
String index_file = table_path + "index.mrk";
if (!disk->exists(index_file))
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
{
pipes.emplace_back(std::make_shared<NullSource>(getSampleBlockForColumns(column_names)));
return pipes;
}
CompressedReadBufferFromFile index_in(fullPath(disk, index_file), 0, 0, 0, INDEX_BUFFER_SIZE);
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};
BlockInputStreams res;
size_t size = index->blocks.size();
if (num_streams > size)
num_streams = size;
@ -270,13 +287,13 @@ BlockInputStreams StorageStripeLog::read(
std::advance(begin, stream * size / num_streams);
std::advance(end, (stream + 1) * size / num_streams);
res.emplace_back(std::make_shared<StripeLogBlockInputStream>(
*this, context.getSettingsRef().max_read_buffer_size, index, begin, end));
pipes.emplace_back(std::make_shared<StripeLogSource>(
*this, column_names, context.getSettingsRef().max_read_buffer_size, index, begin, end));
}
/// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change.
return res;
return pipes;
}

View File

@ -18,14 +18,14 @@ namespace DB
*/
class StorageStripeLog : public ext::shared_ptr_helper<StorageStripeLog>, public IStorage
{
friend class StripeLogBlockInputStream;
friend class StripeLogSource;
friend class StripeLogBlockOutputStream;
friend struct ext::shared_ptr_helper<StorageStripeLog>;
public:
String getName() const override { return "StripeLog"; }
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,