Processors support for StorageLog reading.

This commit is contained in:
Nikolai Kochetov 2020-01-31 18:10:10 +03:00
parent 384e68d745
commit 046ff34525
3 changed files with 36 additions and 26 deletions

View File

@ -44,6 +44,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
private:
Block sample_block;
const Names key_names;

View File

@ -20,6 +20,8 @@
#include <Columns/ColumnArray.h>
#include <Interpreters/Context.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
@ -41,13 +43,25 @@ namespace ErrorCodes
}
class LogBlockInputStream final : public IBlockInputStream
class LogSource final : public SourceWithProgress
{
public:
LogBlockInputStream(
static Block getHeader(const NamesAndTypesList & columns)
{
Block res;
for (const auto & name_type : columns)
res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
return Nested::flatten(res);
}
LogSource(
size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_,
size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
: block_size(block_size_),
: SourceWithProgress(getHeader(columns_)),
block_size(block_size_),
columns(columns_),
storage(storage_),
mark_number(mark_number_),
@ -58,18 +72,8 @@ public:
String getName() const override { return "Log"; }
Block getHeader() const override
{
Block res;
for (const auto & name_type : columns)
res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
return Nested::flatten(res);
}
protected:
Block readImpl() override;
Chunk generate() override;
private:
size_t block_size;
@ -181,15 +185,15 @@ private:
};
Block LogBlockInputStream::readImpl()
Chunk LogSource::generate()
{
Block res;
if (rows_read == rows_limit)
return res;
return {};
if (storage.disk->isDirectoryEmpty(storage.table_path))
return res;
return {};
/// How many rows to read for the next block.
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
@ -208,7 +212,7 @@ Block LogBlockInputStream::readImpl()
throw;
}
if (column->size())
if (!column->empty())
res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name));
}
@ -224,11 +228,13 @@ Block LogBlockInputStream::readImpl()
streams.clear();
}
return Nested::flatten(res);
res = Nested::flatten(res);
UInt64 num_rows = res.rows();
return Chunk(res.getColumns(), num_rows);
}
void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read)
void LogSource::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read)
{
IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
@ -564,7 +570,7 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
return it->second.marks;
}
BlockInputStreams StorageLog::read(
Pipes StorageLog::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -579,7 +585,7 @@ BlockInputStreams StorageLog::read(
std::shared_lock<std::shared_mutex> lock(rwlock);
BlockInputStreams res;
Pipes pipes;
const Marks & marks = getMarksWithRealRowCount();
size_t marks_size = marks.size();
@ -597,7 +603,7 @@ BlockInputStreams StorageLog::read(
size_t rows_begin = mark_begin ? marks[mark_begin - 1].rows : 0;
size_t rows_end = mark_end ? marks[mark_end - 1].rows : 0;
res.emplace_back(std::make_shared<LogBlockInputStream>(
pipes.emplace_back(std::make_shared<LogSource>(
max_block_size,
all_columns,
*this,
@ -606,7 +612,7 @@ BlockInputStreams StorageLog::read(
max_read_buffer_size));
}
return res;
return pipes;
}
BlockOutputStreamPtr StorageLog::write(

View File

@ -17,14 +17,14 @@ namespace DB
*/
class StorageLog : public ext::shared_ptr_helper<StorageLog>, public IStorage
{
friend class LogBlockInputStream;
friend class LogSource;
friend class LogBlockOutputStream;
friend struct ext::shared_ptr_helper<StorageLog>;
public:
String getName() const override { return "Log"; }
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -32,6 +32,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(