ClickHouse/src/Interpreters/TemporaryDataOnDisk.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

176 lines
5.3 KiB
C++
Raw Normal View History

#pragma once
#include <boost/noncopyable.hpp>
#include <Interpreters/Context.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/IVolume.h>
2022-10-05 16:35:10 +00:00
#include <Common/CurrentMetrics.h>
2022-12-06 17:27:05 +00:00
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/FileCache.h>
2022-10-05 16:35:10 +00:00
namespace CurrentMetrics
{
extern const Metric TemporaryFilesUnknown;
}
namespace DB
{
class TemporaryDataOnDiskScope;
using TemporaryDataOnDiskScopePtr = std::shared_ptr<TemporaryDataOnDiskScope>;
class TemporaryDataOnDisk;
2022-09-21 12:51:46 +00:00
using TemporaryDataOnDiskPtr = std::unique_ptr<TemporaryDataOnDisk>;
class TemporaryFileStream;
2022-09-21 12:51:46 +00:00
using TemporaryFileStreamPtr = std::unique_ptr<TemporaryFileStream>;
/*
2022-09-21 12:51:46 +00:00
* Used to account amount of temporary data written to disk.
* If limit is set, throws exception if limit is exceeded.
2022-09-16 11:00:28 +00:00
* Data can be nested, so parent scope accounts all data written by children.
* Scopes are: global -> per-user -> per-query -> per-purpose (sorting, aggregation, etc).
*/
class TemporaryDataOnDiskScope : boost::noncopyable
{
public:
struct StatAtomic
{
std::atomic<size_t> compressed_size;
std::atomic<size_t> uncompressed_size;
};
2022-12-06 17:27:05 +00:00
explicit TemporaryDataOnDiskScope(VolumePtr volume_, size_t limit_)
: volume(std::move(volume_)), limit(limit_)
{}
explicit TemporaryDataOnDiskScope(VolumePtr volume_, FileCache * file_cache_, size_t limit_)
: volume(std::move(volume_)), file_cache(file_cache_), limit(limit_)
{}
explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, size_t limit_)
: parent(std::move(parent_)), volume(parent->volume), file_cache(parent->file_cache), limit(limit_)
{}
/// TODO: remove
/// Refactor all code that uses volume directly to use TemporaryDataOnDisk.
2022-12-06 17:27:05 +00:00
VolumePtr getVolume() const { return volume; }
protected:
void deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta);
TemporaryDataOnDiskScopePtr parent = nullptr;
2022-12-06 10:04:15 +00:00
2022-12-06 17:27:05 +00:00
VolumePtr volume = nullptr;
FileCache * file_cache = nullptr;
StatAtomic stat;
size_t limit = 0;
};
/*
* Holds the set of temporary files.
* New file stream is created with `createStream`.
* Streams are owned by this object and will be deleted when it is deleted.
* It's a leaf node in temorarty data scope tree.
*/
class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
{
2022-09-21 12:51:46 +00:00
friend class TemporaryFileStream; /// to allow it to call `deltaAllocAndCheck` to account data
2022-10-05 16:35:10 +00:00
public:
using TemporaryDataOnDiskScope::StatAtomic;
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_);
2022-10-05 16:35:10 +00:00
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope);
2022-09-28 10:22:16 +00:00
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
2022-10-05 16:35:10 +00:00
TemporaryFileStream & createStream(const Block & header, size_t max_file_size = 0);
/// Write raw data directly into buffer.
/// Differences from `createStream`:
/// 1) it doesn't account data in parent scope
/// 2) returned buffer owns resources (instead of TemporaryDataOnDisk itself)
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
WriteBufferPtr createRawStream(size_t max_file_size = 0);
2022-09-28 10:22:16 +00:00
std::vector<TemporaryFileStream *> getStreams() const;
2022-09-21 12:51:46 +00:00
bool empty() const;
const StatAtomic & getStat() const { return stat; }
private:
FileSegmentsHolderPtr createCacheFile(size_t max_file_size);
TemporaryFileOnDiskHolder createRegularFile(size_t max_file_size);
2022-12-06 17:27:05 +00:00
2022-09-21 12:51:46 +00:00
mutable std::mutex mutex;
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);
2022-10-05 16:35:10 +00:00
typename CurrentMetrics::Metric current_metric_scope = CurrentMetrics::TemporaryFilesUnknown;
};
/*
* Data can be written into this stream and then read.
* After finish writing, call `finishWriting` and then `read` to read the data.
* Account amount of data written to disk in parent scope.
*/
2022-09-01 12:22:49 +00:00
class TemporaryFileStream : boost::noncopyable
{
public:
struct Stat
{
/// Statistics for file
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
size_t compressed_size = 0;
size_t uncompressed_size = 0;
2022-10-05 16:35:10 +00:00
size_t num_rows = 0;
};
2022-12-06 17:27:05 +00:00
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
2023-01-04 19:57:42 +00:00
TemporaryFileStream(FileSegmentsHolderPtr segments_, const Block & header_, TemporaryDataOnDisk * parent_);
2022-12-06 10:04:15 +00:00
size_t write(const Block & block);
2022-12-06 17:27:05 +00:00
void flush();
Stat finishWriting();
bool isWriteFinished() const;
Block read();
2022-12-06 17:27:05 +00:00
String getPath() const;
Block getHeader() const { return header; }
2022-09-01 12:22:49 +00:00
2022-10-05 16:35:10 +00:00
/// Read finished and file released
bool isEof() const;
~TemporaryFileStream();
private:
2022-09-21 12:51:46 +00:00
void updateAllocAndCheck();
2022-09-01 12:22:49 +00:00
2022-10-05 16:35:10 +00:00
/// Release everything, close reader and writer, delete file
void release();
2022-09-01 12:22:49 +00:00
TemporaryDataOnDisk * parent;
Block header;
2022-09-01 12:22:49 +00:00
2022-12-06 17:27:05 +00:00
/// Data can be stored in file directly or in the cache
TemporaryFileOnDiskHolder file;
2023-01-04 19:57:42 +00:00
FileSegmentsHolderPtr segment_holder;
Stat stat;
struct OutputWriter;
std::unique_ptr<OutputWriter> out_writer;
struct InputReader;
std::unique_ptr<InputReader> in_reader;
};
}