Merge pull request #66606 from ClickHouse/vdimir/tmp_data_refactoring

Refactor TempDataOnDisk
This commit is contained in:
Alexander Gololobov 2024-11-15 13:23:15 +00:00 committed by GitHub
commit bb4549eb08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 660 additions and 684 deletions

View File

@ -49,6 +49,7 @@
M(TemporaryFilesForSort, "Number of temporary files created for external sorting") \
M(TemporaryFilesForAggregation, "Number of temporary files created for external aggregation") \
M(TemporaryFilesForJoin, "Number of temporary files created for JOIN") \
M(TemporaryFilesForMerge, "Number of temporary files for vertical merge") \
M(TemporaryFilesUnknown, "Number of temporary files created without known purpose") \
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
M(RemoteRead, "Number of read with remote reader in fly") \

View File

@ -69,7 +69,7 @@ static void testCascadeBufferRedability(
auto rbuf = wbuf_readable.tryGetReadBuffer();
ASSERT_FALSE(!rbuf);
concat.appendBuffer(wrapReadBufferPointer(std::move(rbuf)));
concat.appendBuffer(std::move(rbuf));
}
std::string decoded_data;

View File

@ -335,7 +335,7 @@ Aggregator::Aggregator(const Block & header_, const Params & params_)
: header(header_)
, keys_positions(calculateKeysPositions(header, params_))
, params(params_)
, tmp_data(params.tmp_data_scope ? std::make_unique<TemporaryDataOnDisk>(params.tmp_data_scope, CurrentMetrics::TemporaryFilesForAggregation) : nullptr)
, tmp_data(params.tmp_data_scope ? params.tmp_data_scope->childScope(CurrentMetrics::TemporaryFilesForAggregation) : nullptr)
, min_bytes_for_prefetch(getMinBytesForPrefetch())
{
/// Use query-level memory tracker
@ -1519,10 +1519,15 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
Stopwatch watch;
size_t rows = data_variants.size();
auto & out_stream = tmp_data->createStream(getHeader(false), max_temp_file_size);
auto & out_stream = [this, max_temp_file_size]() -> TemporaryBlockStreamHolder &
{
std::lock_guard lk(tmp_files_mutex);
return tmp_files.emplace_back(getHeader(false), tmp_data.get(), max_temp_file_size);
}();
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.getPath());
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.getHolder()->describeFilePath());
/// Flush only two-level data and possibly overflow data.
@ -1639,11 +1644,24 @@ Block Aggregator::convertOneBucketToBlock(AggregatedDataVariants & variants, Are
return block;
}
std::list<TemporaryBlockStreamHolder> Aggregator::detachTemporaryData()
{
std::lock_guard lk(tmp_files_mutex);
return std::move(tmp_files);
}
bool Aggregator::hasTemporaryData() const
{
std::lock_guard lk(tmp_files_mutex);
return !tmp_files.empty();
}
template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
TemporaryFileStream & out) const
TemporaryBlockStreamHolder & out) const
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;
@ -1660,14 +1678,14 @@ void Aggregator::writeToTemporaryFileImpl(
for (UInt32 bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
Block block = convertOneBucketToBlock(data_variants, method, data_variants.aggregates_pool, false, bucket);
out.write(block);
out->write(block);
update_max_sizes(block);
}
if (params.overflow_row)
{
Block block = prepareBlockAndFillWithoutKey(data_variants, false, true);
out.write(block);
out->write(block);
update_max_sizes(block);
}

View File

@ -309,9 +309,9 @@ public:
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size = 0) const;
bool hasTemporaryData() const { return tmp_data && !tmp_data->empty(); }
bool hasTemporaryData() const;
const TemporaryDataOnDisk & getTemporaryData() const { return *tmp_data; }
std::list<TemporaryBlockStreamHolder> detachTemporaryData();
/// Get data structure of the result.
Block getHeader(bool final) const;
@ -355,7 +355,9 @@ private:
LoggerPtr log = getLogger("Aggregator");
/// For external aggregation.
TemporaryDataOnDiskPtr tmp_data;
TemporaryDataOnDiskScopePtr tmp_data;
mutable std::mutex tmp_files_mutex;
mutable std::list<TemporaryBlockStreamHolder> tmp_files TSA_GUARDED_BY(tmp_files_mutex);
size_t min_bytes_for_prefetch = 0;
@ -456,7 +458,7 @@ private:
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
TemporaryFileStream & out) const;
TemporaryBlockStreamHolder & out) const;
/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>

View File

@ -364,6 +364,8 @@ struct ContextSharedPart : boost::noncopyable
/// Child scopes for more fine-grained accounting are created per user/query/etc.
/// Initialized once during server startup.
TemporaryDataOnDiskScopePtr root_temp_data_on_disk TSA_GUARDED_BY(mutex);
/// TODO: remove, use only root_temp_data_on_disk
VolumePtr temporary_volume_legacy;
mutable OnceFlag async_loader_initialized;
mutable std::unique_ptr<AsyncLoader> async_loader; /// Thread pool for asynchronous initialization of arbitrary DAG of `LoadJob`s (used for tables loading)
@ -799,10 +801,9 @@ struct ContextSharedPart : boost::noncopyable
}
/// Special volumes might also use disks that require shutdown.
auto & tmp_data = root_temp_data_on_disk;
if (tmp_data && tmp_data->getVolume())
if (temporary_volume_legacy)
{
auto & disks = tmp_data->getVolume()->getDisks();
auto & disks = temporary_volume_legacy->getDisks();
for (auto & disk : disks)
disk->shutdown();
}
@ -1184,8 +1185,8 @@ VolumePtr Context::getGlobalTemporaryVolume() const
SharedLockGuard lock(shared->mutex);
/// Calling this method we just bypass the `temp_data_on_disk` and write to the file on the volume directly.
/// Volume is the same for `root_temp_data_on_disk` (always set) and `temp_data_on_disk` (if it's set).
if (shared->root_temp_data_on_disk)
return shared->root_temp_data_on_disk->getVolume();
if (shared->temporary_volume_legacy)
return shared->temporary_volume_legacy;
return nullptr;
}
@ -1273,6 +1274,10 @@ try
/// We skip directories (for example, 'http_buffers' - it's used for buffering of the results) and all other file types.
}
}
else
{
fs::create_directories(path);
}
}
catch (...)
{
@ -1306,7 +1311,8 @@ void Context::setTemporaryStoragePath(const String & path, size_t max_size)
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
temporary_data_on_disk_settings.max_size_on_disk = max_size;
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), std::move(temporary_data_on_disk_settings));
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, std::move(temporary_data_on_disk_settings));
shared->temporary_volume_legacy = volume;
}
void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_size)
@ -1354,7 +1360,8 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
temporary_data_on_disk_settings.max_size_on_disk = max_size;
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), std::move(temporary_data_on_disk_settings));
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, std::move(temporary_data_on_disk_settings));
shared->temporary_volume_legacy = volume;
}
void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size)
@ -1378,7 +1385,8 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
temporary_data_on_disk_settings.max_size_on_disk = max_size;
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), file_cache.get(), std::move(temporary_data_on_disk_settings));
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(file_cache.get(), std::move(temporary_data_on_disk_settings));
shared->temporary_volume_legacy = volume;
}
void Context::setFlagsPath(const String & path)

View File

@ -41,15 +41,15 @@ namespace
class AccumulatedBlockReader
{
public:
AccumulatedBlockReader(TemporaryFileStream & reader_,
AccumulatedBlockReader(TemporaryBlockStreamReaderHolder reader_,
std::mutex & mutex_,
size_t result_block_size_ = 0)
: reader(reader_)
: reader(std::move(reader_))
, mutex(mutex_)
, result_block_size(result_block_size_)
{
if (!reader.isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Reading not finished file");
if (!reader)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Reader is nullptr");
}
Block read()
@ -63,7 +63,7 @@ namespace
size_t rows_read = 0;
do
{
Block block = reader.read();
Block block = reader->read();
rows_read += block.rows();
if (!block)
{
@ -81,7 +81,7 @@ namespace
}
private:
TemporaryFileStream & reader;
TemporaryBlockStreamReaderHolder reader;
std::mutex & mutex;
const size_t result_block_size;
@ -124,12 +124,12 @@ class GraceHashJoin::FileBucket : boost::noncopyable
public:
using BucketLock = std::unique_lock<std::mutex>;
explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, LoggerPtr log_)
: idx{bucket_index_}
, left_file{left_file_}
, right_file{right_file_}
, state{State::WRITING_BLOCKS}
, log{log_}
explicit FileBucket(size_t bucket_index_, TemporaryBlockStreamHolder left_file_, TemporaryBlockStreamHolder right_file_, LoggerPtr log_)
: idx(bucket_index_)
, left_file(std::move(left_file_))
, right_file(std::move(right_file_))
, state(State::WRITING_BLOCKS)
, log(log_)
{
}
@ -157,12 +157,6 @@ public:
return addBlockImpl(block, right_file, lock);
}
bool finished() const
{
std::unique_lock<std::mutex> left_lock(left_file_mutex);
return left_file.isEof();
}
bool empty() const { return is_empty.load(); }
AccumulatedBlockReader startJoining()
@ -172,24 +166,21 @@ public:
std::unique_lock<std::mutex> left_lock(left_file_mutex);
std::unique_lock<std::mutex> right_lock(right_file_mutex);
left_file.finishWriting();
right_file.finishWriting();
state = State::JOINING_BLOCKS;
}
return AccumulatedBlockReader(right_file, right_file_mutex);
return AccumulatedBlockReader(right_file.getReadStream(), right_file_mutex);
}
AccumulatedBlockReader getLeftTableReader()
{
ensureState(State::JOINING_BLOCKS);
return AccumulatedBlockReader(left_file, left_file_mutex);
return AccumulatedBlockReader(left_file.getReadStream(), left_file_mutex);
}
const size_t idx;
private:
bool addBlockImpl(const Block & block, TemporaryFileStream & writer, std::unique_lock<std::mutex> & lock)
bool addBlockImpl(const Block & block, TemporaryBlockStreamHolder & writer, std::unique_lock<std::mutex> & lock)
{
ensureState(State::WRITING_BLOCKS);
@ -199,7 +190,7 @@ private:
if (block.rows())
is_empty = false;
writer.write(block);
writer->write(block);
return true;
}
@ -217,8 +208,8 @@ private:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid state transition, expected {}, got {}", expected, state.load());
}
TemporaryFileStream & left_file;
TemporaryFileStream & right_file;
TemporaryBlockStreamHolder left_file;
TemporaryBlockStreamHolder right_file;
mutable std::mutex left_file_mutex;
mutable std::mutex right_file_mutex;
@ -274,7 +265,7 @@ GraceHashJoin::GraceHashJoin(
, max_num_buckets{context->getSettingsRef()[Setting::grace_hash_join_max_buckets]}
, left_key_names(table_join->getOnlyClause().key_names_left)
, right_key_names(table_join->getOnlyClause().key_names_right)
, tmp_data(std::make_unique<TemporaryDataOnDisk>(tmp_data_, CurrentMetrics::TemporaryFilesForJoin))
, tmp_data(tmp_data_->childScope(CurrentMetrics::TemporaryFilesForJoin))
, hash_join(makeInMemoryJoin("grace0"))
, hash_join_sample_block(hash_join->savedBlockSample())
{
@ -398,10 +389,10 @@ void GraceHashJoin::addBuckets(const size_t bucket_count)
for (size_t i = 0; i < bucket_count; ++i)
try
{
auto & left_file = tmp_data->createStream(left_sample_block);
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
TemporaryBlockStreamHolder left_file(left_sample_block, tmp_data.get());
TemporaryBlockStreamHolder right_file(prepareRightBlock(right_sample_block), tmp_data.get());
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, left_file, right_file, log);
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, std::move(left_file), std::move(right_file), log);
tmp_buckets.emplace_back(std::move(new_bucket));
}
catch (...)
@ -632,12 +623,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
for (bucket_idx = bucket_idx + 1; bucket_idx < buckets.size(); ++bucket_idx)
{
current_bucket = buckets[bucket_idx].get();
if (current_bucket->finished() || current_bucket->empty())
if (current_bucket->empty())
{
LOG_TRACE(log, "Skipping {} {} bucket {}",
current_bucket->finished() ? "finished" : "",
current_bucket->empty() ? "empty" : "",
bucket_idx);
LOG_TRACE(log, "Skipping empty bucket {}", bucket_idx);
continue;
}

View File

@ -132,7 +132,7 @@ private:
Names left_key_names;
Names right_key_names;
TemporaryDataOnDiskPtr tmp_data;
TemporaryDataOnDiskScopePtr tmp_data;
Buckets buckets;
mutable SharedMutex rehash_mutex;

View File

@ -35,11 +35,6 @@
#include <Interpreters/HashJoin/HashJoinMethods.h>
#include <Interpreters/HashJoin/JoinUsedFlags.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForJoin;
}
namespace DB
{
@ -64,7 +59,7 @@ struct NotProcessedCrossJoin : public ExtraBlock
{
size_t left_position;
size_t right_block;
std::unique_ptr<TemporaryFileStream::Reader> reader;
std::optional<TemporaryBlockStreamReaderHolder> reader;
};
@ -106,10 +101,7 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, instance_id(instance_id_)
, asof_inequality(table_join->getAsofInequality())
, data(std::make_shared<RightTableData>())
, tmp_data(
table_join_->getTempDataOnDisk()
? std::make_unique<TemporaryDataOnDisk>(table_join_->getTempDataOnDisk(), CurrentMetrics::TemporaryFilesForJoin)
: nullptr)
, tmp_data(table_join_->getTempDataOnDisk())
, right_sample_block(right_sample_block_)
, max_joined_block_rows(table_join->maxJoinedBlockRows())
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
@ -520,11 +512,10 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
&& (tmp_stream || (max_bytes_in_join && getTotalByteCount() + block_to_save.allocatedBytes() >= max_bytes_in_join)
|| (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join)))
{
if (tmp_stream == nullptr)
{
tmp_stream = &tmp_data->createStream(right_sample_block);
}
tmp_stream->write(block_to_save);
if (!tmp_stream)
tmp_stream.emplace(right_sample_block, tmp_data.get());
tmp_stream.value()->write(block_to_save);
return true;
}
@ -730,13 +721,14 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
{
size_t start_left_row = 0;
size_t start_right_block = 0;
std::unique_ptr<TemporaryFileStream::Reader> reader = nullptr;
std::optional<TemporaryBlockStreamReaderHolder> reader;
if (not_processed)
{
auto & continuation = static_cast<NotProcessedCrossJoin &>(*not_processed);
start_left_row = continuation.left_position;
start_right_block = continuation.right_block;
reader = std::move(continuation.reader);
if (continuation.reader)
reader = std::move(*continuation.reader);
not_processed.reset();
}
@ -804,12 +796,10 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
if (tmp_stream && rows_added <= max_joined_block_rows)
{
if (reader == nullptr)
{
tmp_stream->finishWritingAsyncSafe();
if (!reader)
reader = tmp_stream->getReadStream();
}
while (auto block_right = reader->read())
while (auto block_right = reader.value()->read())
{
++block_number;
process_right_block(block_right);

View File

@ -423,8 +423,9 @@ private:
std::vector<Sizes> key_sizes;
/// Needed to do external cross join
TemporaryDataOnDiskPtr tmp_data;
TemporaryFileStream* tmp_stream{nullptr};
TemporaryDataOnDiskScopePtr tmp_data;
std::optional<TemporaryBlockStreamHolder> tmp_stream;
mutable std::once_flag finish_writing;
/// Block with columns from the right-side table.
Block right_sample_block;

View File

@ -20,6 +20,11 @@
#include <memory>
#include <base/types.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForJoin;
}
namespace DB
{
@ -265,7 +270,7 @@ public:
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }
TemporaryDataOnDiskScopePtr getTempDataOnDisk() { return tmp_data; }
TemporaryDataOnDiskScopePtr getTempDataOnDisk() { return tmp_data ? tmp_data->childScope(CurrentMetrics::TemporaryFilesForJoin) : nullptr; }
ActionsDAG createJoinedBlockActions(ContextPtr context) const;

View File

@ -9,13 +9,16 @@
#include <Interpreters/Cache/FileCache.h>
#include <Formats/NativeWriter.h>
#include <Core/ProtocolDefines.h>
#include <Disks/IDisk.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/DiskLocal.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <Core/Defines.h>
#include <Common/formatReadable.h>
#include <Common/NaNUtils.h>
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
#include "Common/Exception.h"
#include <Common/Exception.h>
namespace ProfileEvents
{
@ -27,11 +30,293 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int INVALID_STATE;
extern const int LOGICAL_ERROR;
extern const int NOT_ENOUGH_SPACE;
extern const int TOO_MANY_ROWS_OR_BYTES;
}
namespace
{
inline CompressionCodecPtr getCodec(const TemporaryDataOnDiskSettings & settings)
{
if (settings.compression_codec.empty())
return CompressionCodecFactory::instance().get("NONE");
return CompressionCodecFactory::instance().get(settings.compression_codec);
}
}
TemporaryFileHolder::TemporaryFileHolder()
{
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
}
class TemporaryFileInLocalCache : public TemporaryFileHolder
{
public:
explicit TemporaryFileInLocalCache(FileCache & file_cache, size_t reserve_size = 0)
{
const auto key = FileSegment::Key::random();
LOG_TRACE(getLogger("TemporaryFileInLocalCache"), "Creating temporary file in cache with key {}", key);
segment_holder = file_cache.set(
key, 0, std::max<size_t>(1, reserve_size),
CreateFileSegmentSettings(FileSegmentKind::Ephemeral), FileCache::getCommonUser());
chassert(segment_holder->size() == 1);
segment_holder->front().getKeyMetadata()->createBaseDirectory(/* throw_if_failed */true);
}
std::unique_ptr<WriteBuffer> write() override
{
return std::make_unique<WriteBufferToFileSegment>(&segment_holder->front());
}
std::unique_ptr<ReadBuffer> read(size_t buffer_size) const override
{
return std::make_unique<ReadBufferFromFile>(segment_holder->front().getPath(), /* buf_size = */ buffer_size);
}
String describeFilePath() const override
{
return fmt::format("fscache://{}", segment_holder->front().getPath());
}
private:
FileSegmentsHolderPtr segment_holder;
};
class TemporaryFileOnLocalDisk : public TemporaryFileHolder
{
public:
explicit TemporaryFileOnLocalDisk(VolumePtr volume, size_t reserve_size = 0)
: path_to_file("tmp" + toString(UUIDHelpers::generateV4()))
{
LOG_TRACE(getLogger("TemporaryFileOnLocalDisk"), "Creating temporary file '{}'", path_to_file);
if (reserve_size > 0)
{
auto reservation = volume->reserve(reserve_size);
if (!reservation)
{
auto disks = volume->getDisks();
Strings disks_info;
for (const auto & d : disks)
{
auto to_double = [](auto x) { return static_cast<double>(x); };
disks_info.push_back(fmt::format("{}: available: {} unreserved: {}, total: {}, keeping: {}",
d->getName(),
ReadableSize(d->getAvailableSpace().transform(to_double).value_or(NaNOrZero<double>())),
ReadableSize(d->getUnreservedSpace().transform(to_double).value_or(NaNOrZero<double>())),
ReadableSize(d->getTotalSpace().transform(to_double).value_or(NaNOrZero<double>())),
ReadableSize(d->getKeepingFreeSpace())));
}
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE,
"Not enough space on temporary disk, cannot reserve {} bytes on [{}]",
reserve_size, fmt::join(disks_info, ", "));
}
disk = reservation->getDisk();
}
else
{
disk = volume->getDisk();
}
chassert(disk);
}
std::unique_ptr<WriteBuffer> write() override
{
return disk->writeFile(path_to_file);
}
std::unique_ptr<ReadBuffer> read(size_t buffer_size) const override
{
ReadSettings settings;
settings.local_fs_buffer_size = buffer_size;
settings.remote_fs_buffer_size = buffer_size;
settings.prefetch_buffer_size = buffer_size;
return disk->readFile(path_to_file, settings);
}
String describeFilePath() const override
{
return fmt::format("disk({})://{}/{}", disk->getName(), disk->getPath(), path_to_file);
}
~TemporaryFileOnLocalDisk() override
try
{
if (disk->existsFile(path_to_file))
{
LOG_TRACE(getLogger("TemporaryFileOnLocalDisk"), "Removing temporary file '{}'", path_to_file);
disk->removeRecursive(path_to_file);
}
else
{
LOG_WARNING(getLogger("TemporaryFileOnLocalDisk"), "Temporary path '{}' does not exist in '{}' on disk {}", path_to_file, disk->getPath(), disk->getName());
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
private:
DiskPtr disk;
String path_to_file;
};
TemporaryFileProvider createTemporaryFileProvider(VolumePtr volume)
{
if (!volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Volume is not initialized");
return [volume](size_t max_size) -> std::unique_ptr<TemporaryFileHolder>
{
return std::make_unique<TemporaryFileOnLocalDisk>(volume, max_size);
};
}
TemporaryFileProvider createTemporaryFileProvider(FileCache * file_cache)
{
if (!file_cache || !file_cache->isInitialized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "File cache is not initialized");
return [file_cache](size_t max_size) -> std::unique_ptr<TemporaryFileHolder>
{
return std::make_unique<TemporaryFileInLocalCache>(*file_cache, max_size);
};
}
TemporaryDataOnDiskScopePtr TemporaryDataOnDiskScope::childScope(CurrentMetrics::Metric current_metric)
{
TemporaryDataOnDiskSettings child_settings = settings;
child_settings.current_metric = current_metric;
return std::make_shared<TemporaryDataOnDiskScope>(shared_from_this(), child_settings);
}
TemporaryDataReadBuffer::TemporaryDataReadBuffer(std::unique_ptr<ReadBuffer> in_)
: ReadBuffer(nullptr, 0)
, compressed_buf(std::move(in_))
{
BufferBase::set(compressed_buf->buffer().begin(), compressed_buf->buffer().size(), compressed_buf->offset());
}
bool TemporaryDataReadBuffer::nextImpl()
{
compressed_buf->position() = position();
if (!compressed_buf->next())
{
set(compressed_buf->position(), 0);
return false;
}
BufferBase::set(compressed_buf->buffer().begin(), compressed_buf->buffer().size(), compressed_buf->offset());
return true;
}
TemporaryDataBuffer::TemporaryDataBuffer(TemporaryDataOnDiskScope * parent_, size_t reserve_size)
: WriteBuffer(nullptr, 0)
, parent(parent_)
, file_holder(parent->file_provider(reserve_size))
, out_compressed_buf(file_holder->write(), getCodec(parent->getSettings()))
{
WriteBuffer::set(out_compressed_buf->buffer().begin(), out_compressed_buf->buffer().size());
}
void TemporaryDataBuffer::nextImpl()
{
if (!out_compressed_buf)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file buffer writing has been finished");
out_compressed_buf->position() = position();
out_compressed_buf->next();
BufferBase::set(out_compressed_buf->buffer().begin(), out_compressed_buf->buffer().size(), out_compressed_buf->offset());
updateAllocAndCheck();
}
String TemporaryDataBuffer::describeFilePath() const
{
return file_holder->describeFilePath();
}
TemporaryDataBuffer::~TemporaryDataBuffer()
{
if (out_compressed_buf)
// read() nor finishWriting() was called
cancel();
}
void TemporaryDataBuffer::cancelImpl() noexcept
{
if (out_compressed_buf)
{
/// CompressedWriteBuffer doesn't call cancel/finalize for wrapped buffer
out_compressed_buf->cancel();
out_compressed_buf.getHolder()->cancel();
out_compressed_buf.reset();
}
}
void TemporaryDataBuffer::finalizeImpl()
{
if (!out_compressed_buf)
return;
/// CompressedWriteBuffer doesn't call cancel/finalize for wrapped buffer
out_compressed_buf->finalize();
out_compressed_buf.getHolder()->finalize();
updateAllocAndCheck();
out_compressed_buf.reset();
}
TemporaryDataBuffer::Stat TemporaryDataBuffer::finishWriting()
{
/// TemporaryDataBuffer::read can be called from multiple threads
std::call_once(write_finished, [this]
{
if (canceled)
throw Exception(ErrorCodes::INVALID_STATE, "Writing to temporary file buffer was not successful");
next();
finalize();
});
return stat;
}
std::unique_ptr<ReadBuffer> TemporaryDataBuffer::read()
{
finishWriting();
if (stat.compressed_size == 0 && stat.uncompressed_size == 0)
return std::make_unique<TemporaryDataReadBuffer>(std::make_unique<ReadBufferFromEmptyFile>());
/// Keep buffer size less that file size, to avoid memory overhead for large amounts of small files
size_t buffer_size = std::min<size_t>(stat.compressed_size, DBMS_DEFAULT_BUFFER_SIZE);
return std::make_unique<TemporaryDataReadBuffer>(file_holder->read(buffer_size));
}
void TemporaryDataBuffer::updateAllocAndCheck()
{
if (!out_compressed_buf)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file buffer writing has been finished");
size_t new_compressed_size = out_compressed_buf->getCompressedBytes();
size_t new_uncompressed_size = out_compressed_buf->getUncompressedBytes();
if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
file_holder ? file_holder->describeFilePath() : "NULL",
new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
}
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
stat.compressed_size = new_compressed_size;
stat.uncompressed_size = new_uncompressed_size;
}
void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta)
{
@ -54,391 +339,25 @@ void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssiz
stat.uncompressed_size += uncompressed_delta;
}
TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_)
: TemporaryDataOnDiskScope(parent_, parent_->getSettings())
TemporaryBlockStreamHolder::TemporaryBlockStreamHolder(const Block & header_, TemporaryDataOnDiskScope * parent_, size_t reserve_size)
: WrapperGuard(std::make_unique<TemporaryDataBuffer>(parent_, reserve_size), DBMS_TCP_PROTOCOL_VERSION, header_)
, header(header_)
{}
TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope)
: TemporaryDataOnDiskScope(parent_, parent_->getSettings())
, current_metric_scope(metric_scope)
{}
std::unique_ptr<WriteBufferFromFileBase> TemporaryDataOnDisk::createRawStream(size_t max_file_size)
TemporaryDataBuffer::Stat TemporaryBlockStreamHolder::finishWriting() const
{
if (file_cache && file_cache->isInitialized())
{
auto holder = createCacheFile(max_file_size);
return std::make_unique<WriteBufferToFileSegment>(std::move(holder));
}
if (volume)
{
auto tmp_file = createRegularFile(max_file_size);
return std::make_unique<WriteBufferFromTemporaryFile>(std::move(tmp_file));
}
if (!holder)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary block stream is not initialized");
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache and no volume");
impl->flush();
return holder->finishWriting();
}
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
TemporaryBlockStreamReaderHolder TemporaryBlockStreamHolder::getReadStream() const
{
if (file_cache && file_cache->isInitialized())
{
auto holder = createCacheFile(max_file_size);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(holder), header, this));
return *tmp_stream;
}
if (volume)
{
auto tmp_file = createRegularFile(max_file_size);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream
= streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
return *tmp_stream;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache and no volume");
}
FileSegmentsHolderPtr TemporaryDataOnDisk::createCacheFile(size_t max_file_size)
{
if (!file_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache");
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
const auto key = FileSegment::Key::random();
auto holder = file_cache->set(
key, 0, std::max(10_MiB, max_file_size),
CreateFileSegmentSettings(FileSegmentKind::Ephemeral), FileCache::getCommonUser());
chassert(holder->size() == 1);
holder->back().getKeyMetadata()->createBaseDirectory(/* throw_if_failed */true);
return holder;
}
TemporaryFileOnDiskHolder TemporaryDataOnDisk::createRegularFile(size_t max_file_size)
{
if (!volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no volume");
DiskPtr disk;
if (max_file_size > 0)
{
auto reservation = volume->reserve(max_file_size);
if (!reservation)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space on temporary disk");
disk = reservation->getDisk();
}
else
{
disk = volume->getDisk();
}
/// We do not increment ProfileEvents::ExternalProcessingFilesTotal here because it is incremented in TemporaryFileOnDisk constructor.
return std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
}
std::vector<TemporaryFileStream *> TemporaryDataOnDisk::getStreams() const
{
std::vector<TemporaryFileStream *> res;
std::lock_guard lock(mutex);
res.reserve(streams.size());
for (const auto & stream : streams)
res.push_back(stream.get());
return res;
}
bool TemporaryDataOnDisk::empty() const
{
std::lock_guard lock(mutex);
return streams.empty();
}
static inline CompressionCodecPtr getCodec(const TemporaryDataOnDiskSettings & settings)
{
if (settings.compression_codec.empty())
return CompressionCodecFactory::instance().get("NONE");
return CompressionCodecFactory::instance().get(settings.compression_codec);
}
struct TemporaryFileStream::OutputWriter
{
OutputWriter(std::unique_ptr<WriteBuffer> out_buf_, const Block & header_, const TemporaryDataOnDiskSettings & settings)
: out_buf(std::move(out_buf_))
, out_compressed_buf(*out_buf, getCodec(settings))
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
}
size_t write(const Block & block)
{
if (finalized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized stream");
size_t written_bytes = out_writer.write(block);
num_rows += block.rows();
return written_bytes;
}
void flush()
{
if (finalized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot flush finalized stream");
out_compressed_buf.next();
out_buf->next();
out_writer.flush();
}
void finalize()
{
if (finalized)
return;
/// if we called finalize() explicitly, and got an exception,
/// we don't want to get it again in the destructor, so set finalized flag first
finalized = true;
out_writer.flush();
out_compressed_buf.finalize();
out_buf->finalize();
}
~OutputWriter()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
std::unique_ptr<WriteBuffer> out_buf;
CompressedWriteBuffer out_compressed_buf;
NativeWriter out_writer;
std::atomic_size_t num_rows = 0;
bool finalized = false;
};
TemporaryFileStream::Reader::Reader(const String & path_, const Block & header_, size_t size_)
: path(path_)
, size(size_ ? std::min<size_t>(size_, DBMS_DEFAULT_BUFFER_SIZE) : DBMS_DEFAULT_BUFFER_SIZE)
, header(header_)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
}
TemporaryFileStream::Reader::Reader(const String & path_, size_t size_)
: path(path_)
, size(size_ ? std::min<size_t>(size_, DBMS_DEFAULT_BUFFER_SIZE) : DBMS_DEFAULT_BUFFER_SIZE)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
}
Block TemporaryFileStream::Reader::read()
{
if (!in_reader)
{
if (fs::exists(path))
in_file_buf = std::make_unique<ReadBufferFromFile>(path, size);
else
in_file_buf = std::make_unique<ReadBufferFromEmptyFile>();
in_compressed_buf = std::make_unique<CompressedReadBuffer>(*in_file_buf);
if (header.has_value())
in_reader = std::make_unique<NativeReader>(*in_compressed_buf, header.value(), DBMS_TCP_PROTOCOL_VERSION);
else
in_reader = std::make_unique<NativeReader>(*in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION);
}
return in_reader->read();
}
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getAbsolutePath()), header, parent->settings))
{
LOG_TEST(getLogger("TemporaryFileStream"), "Writing to temporary file {}", file->getAbsolutePath());
}
TemporaryFileStream::TemporaryFileStream(FileSegmentsHolderPtr segments_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
, header(header_)
, segment_holder(std::move(segments_))
{
if (segment_holder->size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream can be created only from single segment");
auto out_buf = std::make_unique<WriteBufferToFileSegment>(&segment_holder->front());
LOG_TEST(getLogger("TemporaryFileStream"), "Writing to temporary file {}", out_buf->getFileName());
out_writer = std::make_unique<OutputWriter>(std::move(out_buf), header, parent_->settings);
}
size_t TemporaryFileStream::write(const Block & block)
{
if (!out_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been finished");
updateAllocAndCheck();
size_t bytes_written = out_writer->write(block);
return bytes_written;
}
void TemporaryFileStream::flush()
{
if (!out_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been finished");
out_writer->flush();
}
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
{
if (isWriteFinished())
return stat;
if (out_writer)
{
out_writer->finalize();
/// The amount of written data can be changed after finalization, some buffers can be flushed
/// Need to update the stat
updateAllocAndCheck();
out_writer.reset();
/// reader will be created at the first read call, not to consume memory before it is needed
}
return stat;
}
TemporaryFileStream::Stat TemporaryFileStream::finishWritingAsyncSafe()
{
std::call_once(finish_writing, [this]{ finishWriting(); });
return stat;
}
bool TemporaryFileStream::isWriteFinished() const
{
assert(in_reader == nullptr || out_writer == nullptr);
return out_writer == nullptr;
}
Block TemporaryFileStream::read()
{
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isEof())
return {};
if (!in_reader)
{
in_reader = std::make_unique<Reader>(getPath(), header, getSize());
}
Block block = in_reader->read();
if (!block)
{
/// finalize earlier to release resources, do not wait for the destructor
this->release();
}
return block;
}
std::unique_ptr<TemporaryFileStream::Reader> TemporaryFileStream::getReadStream()
{
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isEof())
return nullptr;
return std::make_unique<Reader>(getPath(), header, getSize());
}
void TemporaryFileStream::updateAllocAndCheck()
{
assert(out_writer);
size_t new_compressed_size = out_writer->out_compressed_buf.getCompressedBytes();
size_t new_uncompressed_size = out_writer->out_compressed_buf.getUncompressedBytes();
if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
getPath(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
}
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
stat.compressed_size = new_compressed_size;
stat.uncompressed_size = new_uncompressed_size;
stat.num_rows = out_writer->num_rows;
}
bool TemporaryFileStream::isEof() const
{
return file == nullptr && !segment_holder;
}
void TemporaryFileStream::release()
{
if (in_reader)
in_reader.reset();
if (out_writer)
{
out_writer->finalize();
out_writer.reset();
}
if (file)
{
file.reset();
parent->deltaAllocAndCheck(-stat.compressed_size, -stat.uncompressed_size);
}
if (segment_holder)
segment_holder.reset();
}
String TemporaryFileStream::getPath() const
{
if (file)
return file->getAbsolutePath();
if (segment_holder && !segment_holder->empty())
return segment_holder->front().getPath();
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream has no file");
}
size_t TemporaryFileStream::getSize() const
{
if (file)
return file->getDisk()->getFileSize(file->getRelativePath());
if (segment_holder && !segment_holder->empty())
return segment_holder->front().getReservedSize();
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream has no file");
}
TemporaryFileStream::~TemporaryFileStream()
{
try
{
release();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
assert(false); /// deltaAllocAndCheck with negative can't throw exception
}
if (!holder)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary block stream is not initialized");
return TemporaryBlockStreamReaderHolder(holder->read(), header, DBMS_TCP_PROTOCOL_VERSION);
}
}

View File

@ -4,15 +4,21 @@
#include <mutex>
#include <boost/noncopyable.hpp>
#include <IO/ReadBufferFromFile.h>
#include <Common/CurrentMetrics.h>
#include <Compression/CompressedReadBuffer.h>
#include <Formats/NativeReader.h>
#include <Core/Block.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Common/CurrentMetrics.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Interpreters/Cache/FileSegment.h>
#include <IO/ReadBufferFromFile.h>
class FileCacheTest_TemporaryDataReadBufferSize_Test;
namespace CurrentMetrics
{
@ -25,11 +31,10 @@ namespace DB
class TemporaryDataOnDiskScope;
using TemporaryDataOnDiskScopePtr = std::shared_ptr<TemporaryDataOnDiskScope>;
class TemporaryDataOnDisk;
using TemporaryDataOnDiskPtr = std::unique_ptr<TemporaryDataOnDisk>;
class TemporaryDataBuffer;
using TemporaryDataBufferPtr = std::unique_ptr<TemporaryDataBuffer>;
class TemporaryFileStream;
using TemporaryFileStreamPtr = std::unique_ptr<TemporaryFileStream>;
class TemporaryFileHolder;
class FileCache;
@ -40,15 +45,26 @@ struct TemporaryDataOnDiskSettings
/// Compression codec for temporary data, if empty no compression will be used. LZ4 by default
String compression_codec = "LZ4";
/// Read/Write internal buffer size
size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
/// Metrics counter to increment when temporary file in current scope are created
CurrentMetrics::Metric current_metric = CurrentMetrics::TemporaryFilesUnknown;
};
/// Creates temporary files located on specified resource (disk, fs_cache, etc.)
using TemporaryFileProvider = std::function<std::unique_ptr<TemporaryFileHolder>(size_t)>;
TemporaryFileProvider createTemporaryFileProvider(VolumePtr volume);
TemporaryFileProvider createTemporaryFileProvider(FileCache * file_cache);
/*
* Used to account amount of temporary data written to disk.
* If limit is set, throws exception if limit is exceeded.
* Data can be nested, so parent scope accounts all data written by children.
* Scopes are: global -> per-user -> per-query -> per-purpose (sorting, aggregation, etc).
*/
class TemporaryDataOnDiskScope : boost::noncopyable
class TemporaryDataOnDiskScope : boost::noncopyable, public std::enable_shared_from_this<TemporaryDataOnDiskScope>
{
public:
struct StatAtomic
@ -57,164 +73,156 @@ public:
std::atomic<size_t> uncompressed_size;
};
explicit TemporaryDataOnDiskScope(VolumePtr volume_, TemporaryDataOnDiskSettings settings_)
: volume(std::move(volume_))
/// Root scope
template <typename T>
TemporaryDataOnDiskScope(T && storage, TemporaryDataOnDiskSettings settings_)
: file_provider(createTemporaryFileProvider(std::forward<T>(storage)))
, settings(std::move(settings_))
{}
explicit TemporaryDataOnDiskScope(VolumePtr volume_, FileCache * file_cache_, TemporaryDataOnDiskSettings settings_)
: volume(std::move(volume_))
, file_cache(file_cache_)
, settings(std::move(settings_))
{}
explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, TemporaryDataOnDiskSettings settings_)
TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, TemporaryDataOnDiskSettings settings_)
: parent(std::move(parent_))
, volume(parent->volume)
, file_cache(parent->file_cache)
, file_provider(parent->file_provider)
, settings(std::move(settings_))
{}
/// TODO: remove
/// Refactor all code that uses volume directly to use TemporaryDataOnDisk.
VolumePtr getVolume() const { return volume; }
TemporaryDataOnDiskScopePtr childScope(CurrentMetrics::Metric current_metric);
const TemporaryDataOnDiskSettings & getSettings() const { return settings; }
protected:
friend class TemporaryDataBuffer;
void deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta);
TemporaryDataOnDiskScopePtr parent = nullptr;
VolumePtr volume = nullptr;
FileCache * file_cache = nullptr;
TemporaryFileProvider file_provider;
StatAtomic stat;
const TemporaryDataOnDiskSettings settings;
};
/*
* Holds the set of temporary files.
* New file stream is created with `createStream`.
* Streams are owned by this object and will be deleted when it is deleted.
* It's a leaf node in temporary data scope tree.
*/
class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
/** Used to hold the wrapper and wrapped object together.
* This class provides a convenient way to manage the lifetime of both the wrapper and the wrapped object.
* The wrapper class (Impl) stores a reference to the wrapped object (Holder), and both objects are owned by this class.
* The lifetime of the wrapper and the wrapped object should be the same.
* This pattern is commonly used when the caller only needs to interact with the wrapper and doesn't need to be aware of the wrapped object.
* Examples: CompressedWriteBuffer and WriteBuffer, and NativeReader and ReadBuffer.
*/
template <typename Impl, typename Holder>
class WrapperGuard
{
friend class TemporaryFileStream; /// to allow it to call `deltaAllocAndCheck` to account data
public:
using TemporaryDataOnDiskScope::StatAtomic;
template <typename ... Args>
explicit WrapperGuard(std::unique_ptr<Holder> holder_, Args && ... args)
: holder(std::move(holder_))
, impl(std::make_unique<Impl>(*holder, std::forward<Args>(args)...))
{
chassert(holder);
chassert(impl);
}
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_);
Impl * operator->() { chassert(impl); chassert(holder); return impl.get(); }
const Impl * operator->() const { chassert(impl); chassert(holder); return impl.get(); }
Impl & operator*() { chassert(impl); chassert(holder); return *impl; }
const Impl & operator*() const { chassert(impl); chassert(holder); return *impl; }
operator bool() const { return impl != nullptr; } /// NOLINT
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope);
const Holder * getHolder() const { return holder.get(); }
Holder * getHolder() { return holder.get(); }
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
TemporaryFileStream & createStream(const Block & header, size_t max_file_size = 0);
void reset()
{
impl.reset();
holder.reset();
}
/// Write raw data directly into buffer.
/// Differences from `createStream`:
/// 1) it doesn't account data in parent scope
/// 2) returned buffer owns resources (instead of TemporaryDataOnDisk itself)
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
std::unique_ptr<WriteBufferFromFileBase> createRawStream(size_t max_file_size = 0);
std::vector<TemporaryFileStream *> getStreams() const;
bool empty() const;
const StatAtomic & getStat() const { return stat; }
private:
FileSegmentsHolderPtr createCacheFile(size_t max_file_size);
TemporaryFileOnDiskHolder createRegularFile(size_t max_file_size);
mutable std::mutex mutex;
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);
typename CurrentMetrics::Metric current_metric_scope = CurrentMetrics::TemporaryFilesUnknown;
protected:
std::unique_ptr<Holder> holder;
std::unique_ptr<Impl> impl;
};
/*
* Data can be written into this stream and then read.
* After finish writing, call `finishWriting` and then either call `read` or 'getReadStream'(only one of the two) to read the data.
* Account amount of data written to disk in parent scope.
*/
class TemporaryFileStream : boost::noncopyable
/// Owns temporary file and provides access to it.
/// On destruction, file is removed and all resources are freed.
/// Lifetime of read/write buffers should be less than lifetime of TemporaryFileHolder.
class TemporaryFileHolder
{
public:
struct Reader
{
Reader(const String & path, const Block & header_, size_t size = 0);
TemporaryFileHolder();
explicit Reader(const String & path, size_t size = 0);
virtual std::unique_ptr<WriteBuffer> write() = 0;
virtual std::unique_ptr<ReadBuffer> read(size_t buffer_size) const = 0;
Block read();
/// Get location for logging
virtual String describeFilePath() const = 0;
const std::string path;
const size_t size;
const std::optional<Block> header;
virtual ~TemporaryFileHolder() = default;
};
std::unique_ptr<ReadBufferFromFileBase> in_file_buf;
std::unique_ptr<CompressedReadBuffer> in_compressed_buf;
std::unique_ptr<NativeReader> in_reader;
};
/// Reads raw data from temporary file
class TemporaryDataReadBuffer : public ReadBuffer
{
public:
explicit TemporaryDataReadBuffer(std::unique_ptr<ReadBuffer> in_);
private:
friend class ::FileCacheTest_TemporaryDataReadBufferSize_Test;
bool nextImpl() override;
WrapperGuard<CompressedReadBuffer, ReadBuffer> compressed_buf;
};
/// Writes raw data to buffer provided by file_holder, and accounts amount of written data in parent scope.
class TemporaryDataBuffer : public WriteBuffer
{
public:
struct Stat
{
/// Statistics for file
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
size_t compressed_size = 0;
size_t uncompressed_size = 0;
size_t num_rows = 0;
};
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
TemporaryFileStream(FileSegmentsHolderPtr segments_, const Block & header_, TemporaryDataOnDisk * parent_);
size_t write(const Block & block);
void flush();
explicit TemporaryDataBuffer(TemporaryDataOnDiskScope * parent_, size_t reserve_size = 0);
void nextImpl() override;
void finalizeImpl() override;
void cancelImpl() noexcept override;
std::unique_ptr<ReadBuffer> read();
Stat finishWriting();
Stat finishWritingAsyncSafe();
bool isWriteFinished() const;
std::unique_ptr<Reader> getReadStream();
String describeFilePath() const;
Block read();
String getPath() const;
size_t getSize() const;
Block getHeader() const { return header; }
/// Read finished and file released
bool isEof() const;
~TemporaryFileStream();
~TemporaryDataBuffer() override;
private:
void updateAllocAndCheck();
/// Release everything, close reader and writer, delete file
void release();
TemporaryDataOnDisk * parent;
Block header;
/// Data can be stored in file directly or in the cache
TemporaryFileOnDiskHolder file;
FileSegmentsHolderPtr segment_holder;
TemporaryDataOnDiskScope * parent;
std::unique_ptr<TemporaryFileHolder> file_holder;
WrapperGuard<CompressedWriteBuffer, WriteBuffer> out_compressed_buf;
std::once_flag write_finished;
Stat stat;
};
std::once_flag finish_writing;
struct OutputWriter;
std::unique_ptr<OutputWriter> out_writer;
/// High level interfaces for reading and writing temporary data by blocks.
using TemporaryBlockStreamReaderHolder = WrapperGuard<NativeReader, ReadBuffer>;
std::unique_ptr<Reader> in_reader;
class TemporaryBlockStreamHolder : public WrapperGuard<NativeWriter, TemporaryDataBuffer>
{
public:
TemporaryBlockStreamHolder(const Block & header_, TemporaryDataOnDiskScope * parent_, size_t reserve_size = 0);
TemporaryBlockStreamReaderHolder getReadStream() const;
TemporaryDataBuffer::Stat finishWriting() const;
const Block & getHeader() const { return header; }
private:
Block header;
};
}

View File

@ -934,7 +934,7 @@ static Block generateBlock(size_t size = 0)
return block;
}
static size_t readAllTemporaryData(TemporaryFileStream & stream)
static size_t readAllTemporaryData(NativeReader & stream)
{
Block block;
size_t read_rows = 0;
@ -947,6 +947,7 @@ static size_t readAllTemporaryData(TemporaryFileStream & stream)
}
TEST_F(FileCacheTest, temporaryData)
try
{
ServerUUID::setRandomForUnitTests();
DB::FileCacheSettings settings;
@ -959,7 +960,7 @@ TEST_F(FileCacheTest, temporaryData)
file_cache.initialize();
const auto user = FileCache::getCommonUser();
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(nullptr, &file_cache, TemporaryDataOnDiskSettings{});
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(&file_cache, TemporaryDataOnDiskSettings{});
auto some_data_holder = file_cache.getOrSet(FileCacheKey::fromPath("some_data"), 0, 5_KiB, 5_KiB, CreateFileSegmentSettings{}, 0, user);
@ -982,12 +983,17 @@ TEST_F(FileCacheTest, temporaryData)
size_t size_used_with_temporary_data;
size_t segments_used_with_temporary_data;
{
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
TemporaryBlockStreamHolder stream(generateBlock(), tmp_data_scope.get());
ASSERT_TRUE(stream);
/// Do nothing with stream, just create it and destroy.
}
auto & stream = tmp_data->createStream(generateBlock());
ASSERT_GT(stream.write(generateBlock(100)), 0);
{
TemporaryBlockStreamHolder stream(generateBlock(), tmp_data_scope.get());
ASSERT_GT(stream->write(generateBlock(100)), 0);
ASSERT_GT(file_cache.getUsedCacheSize(), 0);
ASSERT_GT(file_cache.getFileSegmentsNum(), 0);
@ -995,22 +1001,22 @@ TEST_F(FileCacheTest, temporaryData)
size_t used_size_before_attempt = file_cache.getUsedCacheSize();
/// data can't be evicted because it is still held by `some_data_holder`
ASSERT_THROW({
stream.write(generateBlock(2000));
stream.flush();
stream->write(generateBlock(2000));
stream.finishWriting();
}, DB::Exception);
ASSERT_THROW(stream.finishWriting(), DB::Exception);
ASSERT_EQ(file_cache.getUsedCacheSize(), used_size_before_attempt);
}
{
size_t before_used_size = file_cache.getUsedCacheSize();
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
auto write_buf_stream = tmp_data->createRawStream();
auto write_buf_stream = std::make_unique<TemporaryDataBuffer>(tmp_data_scope.get());
write_buf_stream->write("1234567890", 10);
write_buf_stream->write("abcde", 5);
auto read_buf = dynamic_cast<IReadableWriteBuffer *>(write_buf_stream.get())->tryGetReadBuffer();
auto read_buf = write_buf_stream->read();
ASSERT_GT(file_cache.getUsedCacheSize(), before_used_size + 10);
@ -1023,22 +1029,22 @@ TEST_F(FileCacheTest, temporaryData)
}
{
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
auto & stream = tmp_data->createStream(generateBlock());
TemporaryBlockStreamHolder stream(generateBlock(), tmp_data_scope.get());
ASSERT_GT(stream.write(generateBlock(100)), 0);
ASSERT_GT(stream->write(generateBlock(100)), 0);
some_data_holder.reset();
stream.write(generateBlock(2000));
stream->write(generateBlock(2000));
auto stat = stream.finishWriting();
stream.finishWriting();
ASSERT_TRUE(fs::exists(stream.getPath()));
ASSERT_GT(fs::file_size(stream.getPath()), 100);
String file_path = stream.getHolder()->describeFilePath().substr(strlen("fscache://"));
ASSERT_EQ(stat.num_rows, 2100);
ASSERT_EQ(readAllTemporaryData(stream), 2100);
ASSERT_TRUE(fs::exists(file_path)) << "File " << file_path << " should exist";
ASSERT_GT(fs::file_size(file_path), 100) << "File " << file_path << " should be larger than 100 bytes";
ASSERT_EQ(readAllTemporaryData(*stream.getReadStream()), 2100);
size_used_with_temporary_data = file_cache.getUsedCacheSize();
segments_used_with_temporary_data = file_cache.getFileSegmentsNum();
@ -1054,6 +1060,11 @@ TEST_F(FileCacheTest, temporaryData)
ASSERT_LE(file_cache.getUsedCacheSize(), size_used_before_temporary_data);
ASSERT_LE(file_cache.getFileSegmentsNum(), segments_used_before_temporary_data);
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << std::endl;
throw;
}
TEST_F(FileCacheTest, CachedReadBuffer)
{
@ -1148,18 +1159,22 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
DB::FileCache file_cache("cache", settings);
file_cache.initialize();
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(/*volume=*/nullptr, &file_cache, /*settings=*/TemporaryDataOnDiskSettings{});
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(&file_cache, TemporaryDataOnDiskSettings{});
auto block = generateBlock(/*size=*/3);
auto & stream = tmp_data->createStream(block);
stream.write(block);
stream.finishWriting();
TemporaryBlockStreamHolder stream(block, tmp_data_scope.get());
/// We allocate buffer of size min(getSize(), DBMS_DEFAULT_BUFFER_SIZE)
stream->write(block);
auto stat = stream.finishWriting();
/// We allocate buffer of size min(stat.compressed_size, DBMS_DEFAULT_BUFFER_SIZE)
/// We do care about buffer size because realistic external group by could generate 10^5 temporary files
ASSERT_EQ(stream.getSize(), 62);
ASSERT_EQ(stat.compressed_size, 62);
auto reader = stream.getReadStream();
auto * read_buf = reader.getHolder();
const auto & internal_buffer = static_cast<TemporaryDataReadBuffer *>(read_buf)->compressed_buf.getHolder()->internalBuffer();
ASSERT_EQ(internal_buffer.size(), 62);
}
/// Temporary data stored on disk
@ -1170,16 +1185,14 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
disk = createDisk("temporary_data_read_buffer_size_test_dir");
VolumePtr volume = std::make_shared<SingleDiskVolume>("volume", disk);
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(/*volume=*/volume, /*cache=*/nullptr, /*settings=*/TemporaryDataOnDiskSettings{});
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(volume, TemporaryDataOnDiskSettings{});
auto block = generateBlock(/*size=*/3);
auto & stream = tmp_data->createStream(block);
stream.write(block);
stream.finishWriting();
TemporaryBlockStreamHolder stream(block, tmp_data_scope.get());
stream->write(block);
auto stat = stream.finishWriting();
ASSERT_EQ(stream.getSize(), 62);
ASSERT_EQ(stat.compressed_size, 62);
}
}

View File

@ -31,7 +31,7 @@ CreateQueryUUIDs::CreateQueryUUIDs(const ASTCreateQuery & query, bool generate_r
/// If we generate random UUIDs for already existing tables then those UUIDs will not be correct making those inner target table inaccessible.
/// Thus it's not safe for example to replace
/// "ATTACH MATERIALIZED VIEW mv AS SELECT a FROM b" with
/// "ATTACH MATERIALIZED VIEW mv TO INNER UUID "XXXX" AS SELECT a FROM b"
/// "ATTACH MATERIALIZED VIEW mv TO INNER UUID "248372b7-02c4-4c88-a5e1-282a83cc572a" AS SELECT a FROM b"
/// This replacement is safe only for CREATE queries when inner target tables don't exist yet.
if (!query.attach)
{

View File

@ -282,9 +282,9 @@ void SortingStep::mergeSorting(
if (increase_sort_description_compile_attempts)
increase_sort_description_compile_attempts = false;
auto tmp_data_on_disk = sort_settings.tmp_data
? std::make_unique<TemporaryDataOnDisk>(sort_settings.tmp_data, CurrentMetrics::TemporaryFilesForSort)
: std::unique_ptr<TemporaryDataOnDisk>();
TemporaryDataOnDiskScopePtr tmp_data_on_disk = nullptr;
if (sort_settings.tmp_data)
tmp_data_on_disk = sort_settings.tmp_data->childScope(CurrentMetrics::TemporaryFilesForSort);
return std::make_shared<MergeSortingTransform>(
header,

View File

@ -54,9 +54,9 @@ namespace
class SourceFromNativeStream : public ISource
{
public:
explicit SourceFromNativeStream(TemporaryFileStream * tmp_stream_)
: ISource(tmp_stream_->getHeader())
, tmp_stream(tmp_stream_)
explicit SourceFromNativeStream(const Block & header, TemporaryBlockStreamReaderHolder tmp_stream_)
: ISource(header)
, tmp_stream(std::move(tmp_stream_))
{}
String getName() const override { return "SourceFromNativeStream"; }
@ -69,7 +69,7 @@ namespace
auto block = tmp_stream->read();
if (!block)
{
tmp_stream = nullptr;
tmp_stream.reset();
return {};
}
return convertToChunk(block);
@ -78,7 +78,7 @@ namespace
std::optional<ReadProgress> getReadProgress() override { return std::nullopt; }
private:
TemporaryFileStream * tmp_stream;
TemporaryBlockStreamReaderHolder tmp_stream;
};
}
@ -811,15 +811,18 @@ void AggregatingTransform::initGenerate()
Pipes pipes;
/// Merge external data from all aggregators used in query.
for (const auto & aggregator : *params->aggregator_list_ptr)
for (auto & aggregator : *params->aggregator_list_ptr)
{
const auto & tmp_data = aggregator.getTemporaryData();
for (auto * tmp_stream : tmp_data.getStreams())
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream)));
tmp_files = aggregator.detachTemporaryData();
num_streams += tmp_files.size();
num_streams += tmp_data.getStreams().size();
compressed_size += tmp_data.getStat().compressed_size;
uncompressed_size += tmp_data.getStat().uncompressed_size;
for (auto & tmp_stream : tmp_files)
{
auto stat = tmp_stream.finishWriting();
compressed_size += stat.compressed_size;
uncompressed_size += stat.uncompressed_size;
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream.getHeader(), tmp_stream.getReadStream())));
}
}
LOG_DEBUG(

View File

@ -216,6 +216,8 @@ private:
RowsBeforeStepCounterPtr rows_before_aggregation;
std::list<TemporaryBlockStreamHolder> tmp_files;
void initGenerate();
};

View File

@ -27,15 +27,20 @@ namespace ProfileEvents
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class BufferingToFileTransform : public IAccumulatingTransform
{
public:
BufferingToFileTransform(const Block & header, TemporaryFileStream & tmp_stream_, LoggerPtr log_)
BufferingToFileTransform(const Block & header, TemporaryBlockStreamHolder tmp_stream_, LoggerPtr log_)
: IAccumulatingTransform(header, header)
, tmp_stream(tmp_stream_)
, tmp_stream(std::move(tmp_stream_))
, log(log_)
{
LOG_INFO(log, "Sorting and writing part of data into temporary file {}", tmp_stream.getPath());
LOG_INFO(log, "Sorting and writing part of data into temporary file {}", tmp_stream.getHolder()->describeFilePath());
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
}
@ -44,14 +49,15 @@ public:
void consume(Chunk chunk) override
{
Block block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
tmp_stream.write(block);
tmp_stream->write(block);
}
Chunk generate() override
{
if (!tmp_stream.isWriteFinished())
if (!tmp_read_stream)
{
auto stat = tmp_stream.finishWriting();
tmp_read_stream = tmp_stream.getReadStream();
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size);
@ -59,10 +65,11 @@ public:
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, stat.uncompressed_size);
LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ",
tmp_stream.getPath(), ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
tmp_stream.getHolder()->describeFilePath(),
ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
}
Block block = tmp_stream.read();
Block block = tmp_read_stream.value()->read();
if (!block)
return {};
@ -71,7 +78,8 @@ public:
}
private:
TemporaryFileStream & tmp_stream;
TemporaryBlockStreamHolder tmp_stream;
std::optional<TemporaryBlockStreamReaderHolder> tmp_read_stream;
LoggerPtr log;
};
@ -86,7 +94,7 @@ MergeSortingTransform::MergeSortingTransform(
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
TemporaryDataOnDiskPtr tmp_data_,
TemporaryDataOnDiskScopePtr tmp_data_,
size_t min_free_disk_space_)
: SortingTransform(header, description_, max_merged_block_size_, limit_, increase_sort_description_compile_attempts)
, max_bytes_before_remerge(max_bytes_before_remerge_)
@ -168,9 +176,13 @@ void MergeSortingTransform::consume(Chunk chunk)
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
if (!tmp_data)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDisk is not set for MergeSortingTransform");
temporary_files_num++;
/// If there's less free disk space than reserve_size, an exception will be thrown
size_t reserve_size = sum_bytes_in_blocks + min_free_disk_space;
auto & tmp_stream = tmp_data->createStream(header_without_constants, reserve_size);
TemporaryBlockStreamHolder tmp_stream(header_without_constants, tmp_data.get(), reserve_size);
size_t max_merged_block_size = this->max_merged_block_size;
if (max_block_bytes > 0 && sum_rows_in_blocks > 0 && sum_bytes_in_blocks > 0)
{
@ -179,7 +191,7 @@ void MergeSortingTransform::consume(Chunk chunk)
max_merged_block_size = std::max(std::min(max_merged_block_size, max_block_bytes / avg_row_bytes), 128UL);
}
merge_sorter = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, tmp_stream, log);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, std::move(tmp_stream), log);
processors.emplace_back(current_processor);
@ -223,14 +235,14 @@ void MergeSortingTransform::generate()
{
if (!generated_prefix)
{
size_t num_tmp_files = tmp_data ? tmp_data->getStreams().size() : 0;
if (num_tmp_files == 0)
merge_sorter
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
if (temporary_files_num == 0)
{
merge_sorter = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
}
else
{
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
LOG_INFO(log, "There are {} temporary sorted parts to merge", num_tmp_files);
LOG_INFO(log, "There are {} temporary sorted parts to merge", temporary_files_num);
processors.emplace_back(std::make_shared<MergeSorterSource>(
header_without_constants, std::move(chunks), description, max_merged_block_size, limit));

View File

@ -29,7 +29,7 @@ public:
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
TemporaryDataOnDiskPtr tmp_data_,
TemporaryDataOnDiskScopePtr tmp_data_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSortingTransform"; }
@ -45,7 +45,8 @@ private:
size_t max_bytes_before_remerge;
double remerge_lowered_memory_bytes_ratio;
size_t max_bytes_before_external_sort;
TemporaryDataOnDiskPtr tmp_data;
TemporaryDataOnDiskScopePtr tmp_data;
size_t temporary_files_num = 0;
size_t min_free_disk_space;
size_t max_block_bytes;

View File

@ -170,15 +170,16 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
for (auto & write_buf : write_buffers)
{
if (!write_buf)
continue;
IReadableWriteBuffer * write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get());
if (write_buf_concrete)
if (auto * write_buf_concrete = dynamic_cast<TemporaryDataBuffer *>(write_buf.get()))
{
ReadBufferPtr reread_buf = write_buf_concrete->tryGetReadBuffer();
if (reread_buf)
read_buffers.emplace_back(wrapReadBufferPointer(reread_buf));
if (auto reread_buf = write_buf_concrete->read())
read_buffers.emplace_back(std::move(reread_buf));
}
if (auto * write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get()))
{
if (auto reread_buf = write_buf_concrete->tryGetReadBuffer())
read_buffers.emplace_back(std::move(reread_buf));
}
}
@ -321,21 +322,19 @@ void HTTPHandler::processQuery(
if (buffer_size_memory > 0 || buffer_until_eof)
{
CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1;
CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2;
CascadeWriteBuffer::WriteBufferPtrs cascade_buffers;
CascadeWriteBuffer::WriteBufferConstructors cascade_buffers_lazy;
if (buffer_size_memory > 0)
cascade_buffer1.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory));
cascade_buffers.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory));
if (buffer_until_eof)
{
auto tmp_data = std::make_shared<TemporaryDataOnDisk>(server.context()->getTempDataOnDisk());
auto create_tmp_disk_buffer = [tmp_data] (const WriteBufferPtr &) -> WriteBufferPtr {
return tmp_data->createRawStream();
};
cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer));
auto tmp_data = server.context()->getTempDataOnDisk();
cascade_buffers_lazy.emplace_back([tmp_data](const WriteBufferPtr &) -> WriteBufferPtr
{
return std::make_unique<TemporaryDataBuffer>(tmp_data.get());
});
}
else
{
@ -351,10 +350,10 @@ void HTTPHandler::processQuery(
return next_buffer;
};
cascade_buffer2.emplace_back(push_memory_buffer_and_continue);
cascade_buffers_lazy.emplace_back(push_memory_buffer_and_continue);
}
used_output.out_delayed_and_compressed_holder = std::make_unique<CascadeWriteBuffer>(std::move(cascade_buffer1), std::move(cascade_buffer2));
used_output.out_delayed_and_compressed_holder = std::make_unique<CascadeWriteBuffer>(std::move(cascade_buffers), std::move(cascade_buffers_lazy));
used_output.out_maybe_delayed_and_compressed = used_output.out_delayed_and_compressed_holder.get();
}
else

View File

@ -65,6 +65,11 @@ namespace ProfileEvents
extern const Event MergeProjectionStageExecuteMilliseconds;
}
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForMerge;
}
namespace DB
{
namespace Setting
@ -124,6 +129,7 @@ static ColumnsStatistics getStatisticsForColumns(
return all_statistics;
}
/// Manages the "rows_sources" temporary file that is used during vertical merge.
class RowsSourcesTemporaryFile : public ITemporaryFileLookup
{
@ -132,9 +138,7 @@ public:
static constexpr auto FILE_ID = "rows_sources";
explicit RowsSourcesTemporaryFile(TemporaryDataOnDiskScopePtr temporary_data_on_disk_)
: tmp_disk(std::make_unique<TemporaryDataOnDisk>(temporary_data_on_disk_))
, uncompressed_write_buffer(tmp_disk->createRawStream())
, tmp_file_name_on_disk(uncompressed_write_buffer->getFileName())
: temporary_data_on_disk(temporary_data_on_disk_->childScope(CurrentMetrics::TemporaryFilesForMerge))
{
}
@ -143,11 +147,11 @@ public:
if (name != FILE_ID)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
if (write_buffer)
if (tmp_data_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was already requested for writing, there musto be only one writer");
write_buffer = (std::make_unique<CompressedWriteBuffer>(*uncompressed_write_buffer));
return *write_buffer;
tmp_data_buffer = std::make_unique<TemporaryDataBuffer>(temporary_data_on_disk.get());
return *tmp_data_buffer;
}
std::unique_ptr<ReadBuffer> getTemporaryFileForReading(const String & name) override
@ -163,25 +167,24 @@ public:
return std::make_unique<ReadBufferFromEmptyFile>();
/// Reopen the file for each read so that multiple reads can be performed in parallel and there is no need to seek to the beginning.
auto raw_file_read_buffer = std::make_unique<ReadBufferFromFile>(tmp_file_name_on_disk);
return std::make_unique<CompressedReadBufferFromFile>(std::move(raw_file_read_buffer));
return tmp_data_buffer->read();
}
/// Returns written data size in bytes
size_t finalizeWriting()
{
write_buffer->finalize();
uncompressed_write_buffer->finalize();
if (!tmp_data_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was not requested for writing");
auto stat = tmp_data_buffer->finishWriting();
finalized = true;
final_size = write_buffer->count();
final_size = stat.uncompressed_size;
return final_size;
}
private:
std::unique_ptr<TemporaryDataOnDisk> tmp_disk;
std::unique_ptr<WriteBufferFromFileBase> uncompressed_write_buffer;
std::unique_ptr<WriteBuffer> write_buffer;
const String tmp_file_name_on_disk;
std::unique_ptr<TemporaryDataBuffer> tmp_data_buffer;
TemporaryDataOnDiskScopePtr temporary_data_on_disk;
bool finalized = false;
size_t final_size = 0;
};
@ -874,6 +877,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
/// Note that only one byte index is written for each row, so number of rows is equals to the number of bytes written.
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1) && sum_input_rows_exact != rows_sources_count + input_rows_filtered)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -881,6 +885,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
ctx->it_name_and_type = global_ctx->gathering_columns.cbegin();
const auto & settings = global_ctx->context->getSettingsRef();
@ -1718,7 +1723,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
sort_description,
partition_key_columns,
global_ctx->merging_params,
(is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources' temporary file is used only for vertical merge
(is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources' temporary file is used only for vertical merge
(*data_settings)[MergeTreeSetting::merge_max_block_size],
(*data_settings)[MergeTreeSetting::merge_max_block_size_bytes],
ctx->blocks_are_granules_size,

View File

@ -35,8 +35,8 @@ def test_disk_selection(start_cluster):
node.query(query, settings=settings)
assert node.contains_in_log(
"Writing part of aggregation data into temporary file /disk1/"
"Writing part of aggregation data into temporary file.*/disk1/"
)
assert node.contains_in_log(
"Writing part of aggregation data into temporary file /disk2/"
"Writing part of aggregation data into temporary file.*/disk2/"
)

View File

@ -77,7 +77,8 @@ SELECT
'ok',
'fail: ' || toString(count()) || ' ' || toString(any(ProfileEvents))
)
FROM system.query_log WHERE current_database = currentDatabase()
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = '02402_external_disk_mertrics/join'
AND query ILIKE 'SELECT%2097152%' AND type = 'QueryFinish';