2022-08-31 12:53:26 +00:00
|
|
|
#include <Interpreters/TemporaryDataOnDisk.h>
|
|
|
|
|
2022-08-31 17:17:31 +00:00
|
|
|
#include <IO/WriteBufferFromFile.h>
|
|
|
|
#include <IO/ReadBufferFromFile.h>
|
|
|
|
#include <Compression/CompressedWriteBuffer.h>
|
|
|
|
#include <Compression/CompressedReadBuffer.h>
|
|
|
|
#include <Formats/NativeWriter.h>
|
|
|
|
#include <Formats/NativeReader.h>
|
2022-09-01 12:22:49 +00:00
|
|
|
#include <Core/ProtocolDefines.h>
|
2022-12-06 16:51:08 +00:00
|
|
|
#include <Disks/SingleDiskVolume.h>
|
|
|
|
#include <Disks/DiskLocal.h>
|
2022-09-01 12:22:49 +00:00
|
|
|
|
|
|
|
#include <Common/logger_useful.h>
|
2022-08-31 12:53:26 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
2022-09-01 12:22:49 +00:00
|
|
|
|
2022-08-31 12:53:26 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2022-09-21 12:51:46 +00:00
|
|
|
extern const int TOO_MANY_ROWS_OR_BYTES;
|
2022-08-31 17:17:31 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2022-09-01 12:22:49 +00:00
|
|
|
extern const int NOT_ENOUGH_SPACE;
|
2022-08-31 12:53:26 +00:00
|
|
|
}
|
|
|
|
|
2022-12-06 16:51:08 +00:00
|
|
|
static VolumePtr createLocalSingleDiskVolume(const std::string & path)
|
|
|
|
{
|
|
|
|
auto disk = std::make_shared<DiskLocal>("_tmp_default", path, 0);
|
|
|
|
VolumePtr volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
|
|
|
|
return volume;
|
|
|
|
}
|
|
|
|
|
|
|
|
TemporaryDataOnDiskScope::TemporaryDataOnDiskScope(VolumePtr volume_, FileCache * cache_, size_t limit_)
|
|
|
|
: volume(volume_), cache(cache_), limit(limit_)
|
|
|
|
{
|
|
|
|
if (!volume)
|
|
|
|
{
|
|
|
|
if (!cache)
|
|
|
|
throw Exception("Volume and cache are not specified", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
volume = createLocalSingleDiskVolume(cache->getBasePath());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
TemporaryDataOnDiskScope::TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, size_t limit_)
|
|
|
|
: parent(parent_), volume(parent->volume), cache(parent->cache), limit(limit_)
|
|
|
|
{}
|
|
|
|
|
2022-10-07 10:46:45 +00:00
|
|
|
void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta)
|
2022-08-31 12:53:26 +00:00
|
|
|
{
|
|
|
|
if (parent)
|
2022-09-21 12:51:46 +00:00
|
|
|
parent->deltaAllocAndCheck(compressed_delta, uncompressed_delta);
|
|
|
|
|
2022-08-31 12:53:26 +00:00
|
|
|
|
2022-09-21 12:51:46 +00:00
|
|
|
/// check that we don't go negative
|
|
|
|
if ((compressed_delta < 0 && stat.compressed_size < static_cast<size_t>(-compressed_delta)) ||
|
|
|
|
(uncompressed_delta < 0 && stat.uncompressed_size < static_cast<size_t>(-uncompressed_delta)))
|
|
|
|
{
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Negative temporary data size");
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t new_consumprion = stat.compressed_size + compressed_delta;
|
|
|
|
if (compressed_delta > 0 && limit && new_consumprion > limit)
|
2022-12-06 10:04:15 +00:00
|
|
|
throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES,
|
|
|
|
"Limit for temporary files size exceeded (would consume {} / {} bytes)", new_consumprion, limit);
|
2022-09-21 12:51:46 +00:00
|
|
|
|
|
|
|
stat.compressed_size += compressed_delta;
|
|
|
|
stat.uncompressed_size += uncompressed_delta;
|
2022-08-31 12:53:26 +00:00
|
|
|
}
|
|
|
|
|
2022-12-06 10:04:15 +00:00
|
|
|
VolumePtr TemporaryDataOnDiskScope::getVolume() const
|
|
|
|
{
|
|
|
|
if (!volume)
|
|
|
|
throw Exception("TemporaryDataOnDiskScope has no volume", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
return volume;
|
|
|
|
}
|
|
|
|
|
2022-10-05 16:35:10 +00:00
|
|
|
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
|
2022-08-31 12:53:26 +00:00
|
|
|
{
|
2022-12-06 16:51:08 +00:00
|
|
|
if (!volume)
|
|
|
|
throw Exception("TemporaryDataOnDiskScope has no volume", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
DiskPtr disk;
|
|
|
|
/// If cache is not passed reserve space directly in disk
|
|
|
|
/// Otherwise use FileCachePlaceholder that will reserve space in cache on top of this disk
|
|
|
|
if (max_file_size > 0 && !cache)
|
|
|
|
{
|
|
|
|
auto reservation = volume->reserve(max_file_size);
|
|
|
|
if (!reservation)
|
|
|
|
throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE);
|
|
|
|
disk = reservation->getDisk();
|
|
|
|
}
|
2022-08-31 12:53:26 +00:00
|
|
|
else
|
2022-12-06 16:51:08 +00:00
|
|
|
{
|
|
|
|
disk = volume->getDisk();
|
|
|
|
}
|
2022-09-15 10:19:39 +00:00
|
|
|
|
2022-12-06 16:51:08 +00:00
|
|
|
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
|
|
|
|
std::unique_ptr<ISpacePlaceholder> cache_placeholder = nullptr;
|
|
|
|
if (cache)
|
|
|
|
{
|
|
|
|
cache_placeholder = std::make_unique<FileCachePlaceholder>(cache, tmp_file->getPath());
|
|
|
|
cache_placeholder->reserveCapacity(max_file_size);
|
|
|
|
}
|
|
|
|
|
|
|
|
auto tmp_stream = std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, std::move(cache_placeholder), this);
|
2022-09-15 10:19:39 +00:00
|
|
|
std::lock_guard lock(mutex);
|
2022-12-06 10:04:15 +00:00
|
|
|
return *streams.emplace_back(std::move(tmp_stream));
|
2022-08-31 12:53:26 +00:00
|
|
|
}
|
|
|
|
|
2022-09-28 10:22:16 +00:00
|
|
|
std::vector<TemporaryFileStream *> TemporaryDataOnDisk::getStreams() const
|
2022-09-15 10:19:39 +00:00
|
|
|
{
|
|
|
|
std::vector<TemporaryFileStream *> res;
|
|
|
|
std::lock_guard lock(mutex);
|
2022-09-28 10:22:16 +00:00
|
|
|
res.reserve(streams.size());
|
|
|
|
for (const auto & stream : streams)
|
2022-09-15 10:19:39 +00:00
|
|
|
res.push_back(stream.get());
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2022-09-21 12:51:46 +00:00
|
|
|
bool TemporaryDataOnDisk::empty() const
|
|
|
|
{
|
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
return streams.empty();
|
|
|
|
}
|
|
|
|
|
2022-08-31 12:53:26 +00:00
|
|
|
struct TemporaryFileStream::OutputWriter
|
|
|
|
{
|
2022-09-15 10:19:39 +00:00
|
|
|
OutputWriter(const String & path, const Block & header_)
|
2022-08-31 12:53:26 +00:00
|
|
|
: out_file_buf(path)
|
|
|
|
, out_compressed_buf(out_file_buf)
|
2022-09-15 10:19:39 +00:00
|
|
|
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
|
2022-08-31 12:53:26 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2022-12-06 10:04:15 +00:00
|
|
|
size_t write(const Block & block)
|
2022-08-31 12:53:26 +00:00
|
|
|
{
|
|
|
|
if (finalized)
|
|
|
|
throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR);
|
2022-12-06 10:04:15 +00:00
|
|
|
size_t written_bytes = out_writer.write(block);
|
2022-10-05 16:35:10 +00:00
|
|
|
num_rows += block.rows();
|
2022-12-06 10:04:15 +00:00
|
|
|
return written_bytes;
|
2022-08-31 12:53:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void finalize()
|
|
|
|
{
|
|
|
|
if (finalized)
|
|
|
|
return;
|
2022-09-21 12:51:46 +00:00
|
|
|
|
|
|
|
/// if we called finalize() explicitly, and got an exception,
|
|
|
|
/// we don't want to get it again in the destructor, so set finalized flag first
|
2022-09-14 11:21:25 +00:00
|
|
|
finalized = true;
|
2022-09-21 12:51:46 +00:00
|
|
|
|
|
|
|
out_writer.flush();
|
|
|
|
out_compressed_buf.finalize();
|
|
|
|
out_file_buf.finalize();
|
2022-08-31 12:53:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
~OutputWriter()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
finalize();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
WriteBufferFromFile out_file_buf;
|
|
|
|
CompressedWriteBuffer out_compressed_buf;
|
|
|
|
NativeWriter out_writer;
|
|
|
|
|
2022-10-05 16:35:10 +00:00
|
|
|
std::atomic_size_t num_rows = 0;
|
|
|
|
|
2022-08-31 12:53:26 +00:00
|
|
|
bool finalized = false;
|
|
|
|
};
|
|
|
|
|
|
|
|
struct TemporaryFileStream::InputReader
|
|
|
|
{
|
2022-09-15 10:19:39 +00:00
|
|
|
InputReader(const String & path, const Block & header_)
|
|
|
|
: in_file_buf(path)
|
|
|
|
, in_compressed_buf(in_file_buf)
|
|
|
|
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
explicit InputReader(const String & path)
|
2022-08-31 12:53:26 +00:00
|
|
|
: in_file_buf(path)
|
|
|
|
, in_compressed_buf(in_file_buf)
|
2022-09-15 10:19:39 +00:00
|
|
|
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
|
|
|
|
{
|
|
|
|
}
|
2022-08-31 17:17:31 +00:00
|
|
|
|
|
|
|
Block read() { return in_reader.read(); }
|
2022-08-31 12:53:26 +00:00
|
|
|
|
|
|
|
ReadBufferFromFile in_file_buf;
|
|
|
|
CompressedReadBuffer in_compressed_buf;
|
|
|
|
NativeReader in_reader;
|
|
|
|
};
|
|
|
|
|
2022-12-06 10:04:15 +00:00
|
|
|
TemporaryFileStream::TemporaryFileStream(
|
|
|
|
TemporaryFileHolder file_,
|
|
|
|
const Block & header_,
|
|
|
|
std::unique_ptr<ISpacePlaceholder> space_holder_,
|
|
|
|
TemporaryDataOnDisk * parent_)
|
2022-08-31 17:17:31 +00:00
|
|
|
: parent(parent_)
|
2022-09-15 10:19:39 +00:00
|
|
|
, header(header_)
|
2022-09-01 12:22:49 +00:00
|
|
|
, file(std::move(file_))
|
2022-12-06 10:04:15 +00:00
|
|
|
, space_holder(std::move(space_holder_))
|
2022-10-05 16:35:10 +00:00
|
|
|
, out_writer(std::make_unique<OutputWriter>(file->getPath(), header))
|
2022-08-31 17:17:31 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2022-12-06 10:04:15 +00:00
|
|
|
size_t TemporaryFileStream::write(const Block & block)
|
2022-08-31 17:17:31 +00:00
|
|
|
{
|
|
|
|
if (!out_writer)
|
2022-09-02 14:33:54 +00:00
|
|
|
throw Exception("Writing has been finished", ErrorCodes::LOGICAL_ERROR);
|
2022-08-31 17:17:31 +00:00
|
|
|
|
2022-12-06 10:04:15 +00:00
|
|
|
size_t block_size_in_memory = block.bytes();
|
|
|
|
|
|
|
|
if (space_holder)
|
|
|
|
space_holder->reserveCapacity(block_size_in_memory);
|
|
|
|
|
2022-09-21 12:51:46 +00:00
|
|
|
updateAllocAndCheck();
|
2022-12-06 10:04:15 +00:00
|
|
|
|
|
|
|
size_t bytes_written = out_writer->write(block);
|
|
|
|
if (space_holder)
|
|
|
|
space_holder->setUsed(bytes_written);
|
|
|
|
|
|
|
|
return bytes_written;
|
2022-08-31 17:17:31 +00:00
|
|
|
}
|
|
|
|
|
2022-09-21 13:44:35 +00:00
|
|
|
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
|
2022-08-31 17:17:31 +00:00
|
|
|
{
|
2022-10-05 16:35:10 +00:00
|
|
|
if (isWriteFinished())
|
|
|
|
return stat;
|
|
|
|
|
2022-08-31 17:17:31 +00:00
|
|
|
if (out_writer)
|
|
|
|
{
|
|
|
|
out_writer->finalize();
|
2022-09-21 12:51:46 +00:00
|
|
|
/// The amount of written data can be changed after finalization, some buffers can be flushed
|
|
|
|
/// Need to update the stat
|
|
|
|
updateAllocAndCheck();
|
2022-09-01 12:22:49 +00:00
|
|
|
out_writer.reset();
|
2022-08-31 17:17:31 +00:00
|
|
|
|
2022-09-22 17:39:30 +00:00
|
|
|
/// reader will be created at the first read call, not to consume memory before it is needed
|
2022-08-31 17:17:31 +00:00
|
|
|
}
|
2022-09-15 10:19:39 +00:00
|
|
|
return stat;
|
2022-08-31 17:17:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool TemporaryFileStream::isWriteFinished() const
|
|
|
|
{
|
2022-09-22 17:39:30 +00:00
|
|
|
assert(in_reader == nullptr || out_writer == nullptr);
|
2022-08-31 17:17:31 +00:00
|
|
|
return out_writer == nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
Block TemporaryFileStream::read()
|
|
|
|
{
|
2022-09-21 12:51:46 +00:00
|
|
|
if (!isWriteFinished())
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
|
2022-08-31 17:17:31 +00:00
|
|
|
|
2022-10-05 16:35:10 +00:00
|
|
|
if (isEof())
|
2022-09-22 17:39:30 +00:00
|
|
|
return {};
|
|
|
|
|
|
|
|
if (!in_reader)
|
|
|
|
{
|
2022-10-05 16:35:10 +00:00
|
|
|
in_reader = std::make_unique<InputReader>(file->getPath(), header);
|
2022-09-22 17:39:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Block block = in_reader->read();
|
|
|
|
if (!block)
|
|
|
|
{
|
|
|
|
/// finalize earlier to release resources, do not wait for the destructor
|
2022-10-05 16:35:10 +00:00
|
|
|
this->release();
|
2022-09-22 17:39:30 +00:00
|
|
|
}
|
|
|
|
return block;
|
2022-08-31 17:17:31 +00:00
|
|
|
}
|
|
|
|
|
2022-09-21 12:51:46 +00:00
|
|
|
void TemporaryFileStream::updateAllocAndCheck()
|
2022-09-01 12:22:49 +00:00
|
|
|
{
|
2022-09-14 11:21:25 +00:00
|
|
|
assert(out_writer);
|
2022-09-01 12:22:49 +00:00
|
|
|
size_t new_compressed_size = out_writer->out_compressed_buf.getCompressedBytes();
|
|
|
|
size_t new_uncompressed_size = out_writer->out_compressed_buf.getUncompressedBytes();
|
|
|
|
|
2022-09-15 10:19:39 +00:00
|
|
|
if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size))
|
2022-09-01 12:22:49 +00:00
|
|
|
{
|
2022-09-21 12:51:46 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
2022-09-01 12:22:49 +00:00
|
|
|
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
|
2022-10-05 16:35:10 +00:00
|
|
|
file->getPath(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
|
2022-09-01 12:22:49 +00:00
|
|
|
}
|
|
|
|
|
2022-09-21 12:51:46 +00:00
|
|
|
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
|
2022-09-15 10:19:39 +00:00
|
|
|
stat.compressed_size = new_compressed_size;
|
|
|
|
stat.uncompressed_size = new_uncompressed_size;
|
2022-10-05 16:35:10 +00:00
|
|
|
stat.num_rows = out_writer->num_rows;
|
2022-09-01 12:22:49 +00:00
|
|
|
}
|
|
|
|
|
2022-10-05 16:35:10 +00:00
|
|
|
bool TemporaryFileStream::isEof() const
|
2022-09-22 17:39:30 +00:00
|
|
|
{
|
|
|
|
return file == nullptr;
|
|
|
|
}
|
|
|
|
|
2022-10-05 16:35:10 +00:00
|
|
|
void TemporaryFileStream::release()
|
2022-09-22 17:39:30 +00:00
|
|
|
{
|
|
|
|
if (file)
|
|
|
|
{
|
|
|
|
file.reset();
|
|
|
|
parent->deltaAllocAndCheck(-stat.compressed_size, -stat.uncompressed_size);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (in_reader)
|
|
|
|
in_reader.reset();
|
|
|
|
|
|
|
|
if (out_writer)
|
|
|
|
{
|
|
|
|
out_writer->finalize();
|
|
|
|
out_writer.reset();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-01 12:22:49 +00:00
|
|
|
TemporaryFileStream::~TemporaryFileStream()
|
2022-08-31 17:17:31 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2022-10-05 16:35:10 +00:00
|
|
|
release();
|
2022-08-31 17:17:31 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
2022-09-21 12:51:46 +00:00
|
|
|
assert(false); /// deltaAllocAndCheck with negative can't throw exception
|
2022-08-31 17:17:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-31 12:53:26 +00:00
|
|
|
}
|