Add module part for TemporaryFileStream.h

This commit is contained in:
Azat Khuzhin 2021-10-03 13:56:45 +03:00
parent d822ba1f3b
commit bbd2ea6ac2
2 changed files with 80 additions and 59 deletions

View File

@ -0,0 +1,71 @@
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/WriteBufferFromFile.h>
namespace DB
{
/// To read the data that was flushed into the temporary data file.
TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{}
TemporaryFileStream::TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
{}
/// Flush data from input stream into file for future reading
void TemporaryFileStream::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);
output.writePrefix();
Block block;
while (executor.pull(block))
output.write(block);
output.writeSuffix();
compressed_buf.finalize();
}
TemporaryFileLazySource::TemporaryFileLazySource(const std::string & path_, const Block & header_)
: ISource(header_)
, path(path_)
, done(false)
{}
Chunk TemporaryFileLazySource::generate()
{
if (done)
return {};
if (!stream)
stream = std::make_unique<TemporaryFileStream>(path, header);
auto block = stream->block_in->read();
if (!block)
{
done = true;
stream.reset();
}
return Chunk(block.getColumns(), block.rows());
}
}

View File

@ -1,16 +1,10 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB
{
@ -22,72 +16,28 @@ struct TemporaryFileStream
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
explicit TemporaryFileStream(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{}
TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
{}
explicit TemporaryFileStream(const std::string & path);
TemporaryFileStream(const std::string & path, const Block & header_);
/// Flush data from input stream into file for future reading
static void write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);
output.writePrefix();
Block block;
while (executor.pull(block))
output.write(block);
output.writeSuffix();
compressed_buf.finalize();
}
static void write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec);
};
class TemporaryFileLazySource : public ISource
{
public:
TemporaryFileLazySource(const std::string & path_, const Block & header_)
: ISource(header_)
, path(path_)
, done(false)
{}
TemporaryFileLazySource(const std::string & path_, const Block & header_);
String getName() const override { return "TemporaryFileLazySource"; }
protected:
Chunk generate() override
{
if (done)
return {};
if (!stream)
stream = std::make_unique<TemporaryFileStream>(path, header);
auto block = stream->block_in->read();
if (!block)
{
done = true;
stream.reset();
}
return Chunk(block.getColumns(), block.rows());
}
Chunk generate() override;
private:
const std::string path;
Block header;
bool done;
std::unique_ptr<TemporaryFileStream> stream;
};