Http temporary buffer integration with fs cache

This commit is contained in:
vdimir 2023-04-11 15:36:05 +00:00 committed by Vladimir C
parent 5c5aaeca6d
commit 92d0d9d4ff
10 changed files with 130 additions and 58 deletions

View File

@ -19,12 +19,14 @@
#include <sys/stat.h>
#include <Disks/DiskFactory.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <Common/randomSeed.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteHelpers.h>
#include <Common/logger_useful.h>
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
@ -588,7 +590,7 @@ try
static DiskWriteCheckData data;
String tmp_template = fs::path(disk_path) / "";
{
auto buf = WriteBufferFromTemporaryFile::create(tmp_template);
auto buf = std::make_unique<WriteBufferFromTemporaryFile>(tmp_template);
buf->write(data.data, data.PAGE_SIZE_IN_BYTES);
buf->sync();
}

View File

@ -1,5 +1,6 @@
#include <IO/WriteBufferFromTemporaryFile.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <fcntl.h>
@ -12,18 +13,18 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
}
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<PocoTemporaryFile> && tmp_file_)
: WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600), tmp_file(std::move(tmp_file_))
{}
WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & tmp_dir)
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(TemporaryFileOnDiskHolder && tmp_file_)
: WriteBufferFromFile(tmp_file_->getPath(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600)
, tmp_file(std::move(tmp_file_))
{
return Ptr{new WriteBufferFromTemporaryFile(createTemporaryFile(tmp_dir))};
}
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(const String & tmp_file_path)
: WriteBufferFromFile(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600)
{
}
class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
{
public:
@ -40,11 +41,11 @@ public:
return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name, std::move(origin->tmp_file));
}
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr<PocoTemporaryFile> && tmp_file_)
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, TemporaryFileOnDiskHolder && tmp_file_)
: ReadBufferFromFile(fd_, file_name_), tmp_file(std::move(tmp_file_))
{}
std::unique_ptr<PocoTemporaryFile> tmp_file;
TemporaryFileOnDiskHolder tmp_file;
};

View File

@ -8,6 +8,9 @@
namespace DB
{
class TemporaryFileOnDisk;
using TemporaryFileOnDiskHolder = std::unique_ptr<TemporaryFileOnDisk>;
/// Rereadable WriteBuffer, could be used as disk buffer
/// Creates unique temporary in directory (and directory itself)
class WriteBufferFromTemporaryFile : public WriteBufferFromFile, public IReadableWriteBuffer
@ -15,16 +18,15 @@ class WriteBufferFromTemporaryFile : public WriteBufferFromFile, public IReadabl
public:
using Ptr = std::shared_ptr<WriteBufferFromTemporaryFile>;
static Ptr create(const std::string & tmp_dir);
explicit WriteBufferFromTemporaryFile(TemporaryFileOnDiskHolder && tmp_file_);
explicit WriteBufferFromTemporaryFile(const String & tmp_file_path);
~WriteBufferFromTemporaryFile() override;
private:
explicit WriteBufferFromTemporaryFile(std::unique_ptr<PocoTemporaryFile> && tmp_file);
std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
std::unique_ptr<PocoTemporaryFile> tmp_file;
TemporaryFileOnDiskHolder tmp_file;
friend class ReadBufferFromTemporaryWriteBuffer;
};

View File

@ -3,11 +3,11 @@
#include <stdexcept>
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/copyData.h>
#include <Common/typeid_cast.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -214,7 +214,7 @@ try
std::string tmp_template = "tmp/TemporaryFileWriteBuffer/";
std::string data = makeTestArray(s);
auto buf = WriteBufferFromTemporaryFile::create(tmp_template);
auto buf = std::make_shared<WriteBufferFromTemporaryFile>(tmp_template);
buf->write(data.data(), data.size());
std::string tmp_filename = buf->getFileName();
@ -253,7 +253,7 @@ try
testCascadeBufferRedability(makeTestArray(s),
{},
{
[=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); }
[=] (auto) { return std::make_shared<WriteBufferFromTemporaryFile>(tmp_template); }
});
testCascadeBufferRedability(makeTestArray(s),
@ -261,7 +261,7 @@ try
std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/3ul), 2, 1.5),
},
{
[=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); }
[=] (auto) { return std::make_shared<WriteBufferFromTemporaryFile>(tmp_template); }
});
}
}

View File

@ -1,6 +1,7 @@
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
#include <Interpreters/Cache/FileSegment.h>
#include <IO/SwapHelper.h>
#include <IO/ReadBufferFromFile.h>
#include <base/scope_guard.h>
@ -11,11 +12,23 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_ENOUGH_SPACE;
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(file_segment_->detachWriter()), file_segment(file_segment_)
: WriteBufferFromFileDecorator(file_segment_->detachWriter())
, file_segment(file_segment_)
{
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegmentsHolder && segment_holder_)
: WriteBufferFromFileDecorator(
segment_holder_.file_segments.size() == 1
? segment_holder_.file_segments.front()->detachWriter()
: throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment"))
, file_segment(segment_holder_.file_segments.front().get())
, segment_holder(std::move(segment_holder_))
{
}
@ -52,6 +65,11 @@ void WriteBufferToFileSegment::nextImpl()
file_segment->setDownloadedSize(bytes_to_write);
}
std::shared_ptr<ReadBuffer> WriteBufferToFileSegment::getReadBufferImpl()
{
finalize();
return std::make_shared<ReadBufferFromFile>(file_segment->getPathInLocalCache());
}
WriteBufferToFileSegment::~WriteBufferToFileSegment()
{

View File

@ -1,23 +1,34 @@
#pragma once
#include <IO/WriteBufferFromFileDecorator.h>
#include <Interpreters/Cache/FileSegment.h>
#include <IO/IReadableWriteBuffer.h>
namespace DB
{
class FileSegment;
class WriteBufferToFileSegment : public WriteBufferFromFileDecorator
class WriteBufferToFileSegment : public WriteBufferFromFileDecorator, public IReadableWriteBuffer
{
public:
explicit WriteBufferToFileSegment(FileSegment * file_segment_);
explicit WriteBufferToFileSegment(FileSegmentsHolder && segment_holder);
void nextImpl() override;
~WriteBufferToFileSegment() override;
private:
std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
/// Reference to the file segment in segment_holder if owned by this WriteBufferToFileSegment
/// or to the external file segment passed to the constructor
FileSegment * file_segment;
/// Empty if file_segment is not owned by this WriteBufferToFileSegment
FileSegmentsHolder segment_holder;
};

View File

@ -9,6 +9,7 @@
#include <Core/ProtocolDefines.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/DiskLocal.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <Common/logger_useful.h>
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
@ -54,29 +55,52 @@ TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, Cu
, current_metric_scope(metric_scope)
{}
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
WriteBufferPtr TemporaryDataOnDisk::createRawStream(size_t max_file_size)
{
if (file_cache)
return createStreamToCacheFile(header, max_file_size);
{
auto holder = createCacheFile(max_file_size);
return std::make_shared<WriteBufferToFileSegment>(std::move(holder));
}
else if (volume)
return createStreamToRegularFile(header, max_file_size);
{
auto tmp_file = createRegularFile(max_file_size);
return std::make_shared<WriteBufferFromTemporaryFile>(std::move(tmp_file));
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache and no volume");
}
TemporaryFileStream & TemporaryDataOnDisk::createStreamToCacheFile(const Block & header, size_t max_file_size)
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
{
if (!file_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache");
auto holder = file_cache->set(FileSegment::Key::random(), 0, std::max(10_MiB, max_file_size), CreateFileSegmentSettings(FileSegmentKind::Temporary, /* unbounded */ true));
if (file_cache)
{
auto holder = createCacheFile(max_file_size);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(holder), header, this));
return *tmp_stream;
}
else if (volume)
{
auto tmp_file = createRegularFile(max_file_size);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
return *tmp_stream;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache and no volume");
}
TemporaryFileStream & TemporaryDataOnDisk::createStreamToRegularFile(const Block & header, size_t max_file_size)
FileSegmentsHolder TemporaryDataOnDisk::createCacheFile(size_t max_file_size)
{
if (!file_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache");
return file_cache->set(FileSegment::Key::random(), 0, std::max(10_MiB, max_file_size), CreateFileSegmentSettings(FileSegmentKind::Temporary, /* unbounded */ true));
}
TemporaryFileOnDiskHolder TemporaryDataOnDisk::createRegularFile(size_t max_file_size)
{
if (!volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no volume");
@ -94,11 +118,7 @@ TemporaryFileStream & TemporaryDataOnDisk::createStreamToRegularFile(const Block
disk = volume->getDisk();
}
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
return *tmp_stream;
return std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
}
std::vector<TemporaryFileStream *> TemporaryDataOnDisk::getStreams() const
@ -119,22 +139,11 @@ bool TemporaryDataOnDisk::empty() const
struct TemporaryFileStream::OutputWriter
{
OutputWriter(const String & path, const Block & header_)
: out_buf(std::make_unique<WriteBufferFromFile>(path))
, out_compressed_buf(*out_buf)
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", path);
}
OutputWriter(std::unique_ptr<WriteBufferToFileSegment> out_buf_, const Block & header_)
OutputWriter(std::unique_ptr<WriteBuffer> out_buf_, const Block & header_)
: out_buf(std::move(out_buf_))
, out_compressed_buf(*out_buf)
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"),
"Writing to temporary file {}",
static_cast<const WriteBufferToFileSegment *>(out_buf.get())->getFileName());
}
size_t write(const Block & block)
@ -223,8 +232,9 @@ TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(file->getPath(), header))
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getPath()), header))
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", file->getPath());
}
TemporaryFileStream::TemporaryFileStream(FileSegmentsHolder && segments_, const Block & header_, TemporaryDataOnDisk * parent_)
@ -236,6 +246,8 @@ TemporaryFileStream::TemporaryFileStream(FileSegmentsHolder && segments_, const
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream can be created only from single segment");
auto & segment = segment_holder.file_segments.front();
auto out_buf = std::make_unique<WriteBufferToFileSegment>(segment.get());
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", out_buf->getFileName());
out_writer = std::make_unique<OutputWriter>(std::move(out_buf), header);
}

View File

@ -90,14 +90,20 @@ public:
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
TemporaryFileStream & createStream(const Block & header, size_t max_file_size = 0);
/// Write raw data directly into buffer.
/// Differences from `createStream`:
/// 1) it doesn't account data in parent scope
/// 2) returned buffer owns resources (instead of TemporaryDataOnDisk itself)
WriteBufferPtr createRawStream(size_t max_file_size = 0);
std::vector<TemporaryFileStream *> getStreams() const;
bool empty() const;
const StatAtomic & getStat() const { return stat; }
private:
TemporaryFileStream & createStreamToCacheFile(const Block & header, size_t max_file_size);
TemporaryFileStream & createStreamToRegularFile(const Block & header, size_t max_file_size);
FileSegmentsHolder createCacheFile(size_t max_file_size);
TemporaryFileOnDiskHolder createRegularFile(size_t max_file_size);
mutable std::mutex mutex;
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);

View File

@ -708,6 +708,27 @@ TEST_F(FileCacheTest, temporaryData)
ASSERT_EQ(file_cache.getUsedCacheSize(), used_size_before_attempt);
}
{
size_t before_used_size = file_cache.getUsedCacheSize();
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
auto write_buf_stream = tmp_data->createRawStream();
write_buf_stream->write("1234567890", 10);
write_buf_stream->write("abcde", 5);
auto read_buf = dynamic_cast<IReadableWriteBuffer *>(write_buf_stream.get())->tryGetReadBuffer();
ASSERT_GT(file_cache.getUsedCacheSize(), before_used_size + 10);
char buf[15];
size_t read_size = read_buf->read(buf, 15);
ASSERT_EQ(read_size, 15);
ASSERT_EQ(std::string(buf, 15), "1234567890abcde");
read_size = read_buf->read(buf, 15);
ASSERT_EQ(read_size, 0);
}
{
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
auto & stream = tmp_data->createStream(generateBlock());

View File

@ -11,10 +11,10 @@
#include <IO/ConcatReadBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Interpreters/Context.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Parsers/QueryParameterVisitor.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Session.h>
@ -623,12 +623,11 @@ void HTTPHandler::processQuery(
if (buffer_until_eof)
{
const std::string tmp_path(server.context()->getGlobalTemporaryVolume()->getDisk()->getPath());
const std::string tmp_path_template(fs::path(tmp_path) / "http_buffers/");
auto tmp_data = std::make_shared<TemporaryDataOnDisk>(server.context()->getTempDataOnDisk());
auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
auto create_tmp_disk_buffer = [tmp_data] (const WriteBufferPtr &) -> WriteBufferPtr
{
return WriteBufferFromTemporaryFile::create(tmp_path_template);
return tmp_data->createRawStream(0);
};
cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer));