mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Refactoring TempDataOnDisk
This commit is contained in:
parent
71df65c288
commit
d10b79020e
@ -45,6 +45,7 @@
|
||||
M(TemporaryFilesForSort, "Number of temporary files created for external sorting") \
|
||||
M(TemporaryFilesForAggregation, "Number of temporary files created for external aggregation") \
|
||||
M(TemporaryFilesForJoin, "Number of temporary files created for JOIN") \
|
||||
M(TemporaryFilesForMerge, "Number of temporary files for vertical merge") \
|
||||
M(TemporaryFilesUnknown, "Number of temporary files created without known purpose") \
|
||||
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
|
||||
M(RemoteRead, "Number of read with remote reader in fly") \
|
||||
|
@ -69,7 +69,7 @@ static void testCascadeBufferRedability(
|
||||
auto rbuf = wbuf_readable.tryGetReadBuffer();
|
||||
ASSERT_FALSE(!rbuf);
|
||||
|
||||
concat.appendBuffer(wrapReadBufferPointer(std::move(rbuf)));
|
||||
concat.appendBuffer(std::move(rbuf));
|
||||
}
|
||||
|
||||
std::string decoded_data;
|
||||
|
@ -335,7 +335,7 @@ Aggregator::Aggregator(const Block & header_, const Params & params_)
|
||||
: header(header_)
|
||||
, keys_positions(calculateKeysPositions(header, params_))
|
||||
, params(params_)
|
||||
, tmp_data(params.tmp_data_scope ? std::make_unique<TemporaryDataOnDisk>(params.tmp_data_scope, CurrentMetrics::TemporaryFilesForAggregation) : nullptr)
|
||||
, tmp_data(params.tmp_data_scope ? params.tmp_data_scope->childScope(CurrentMetrics::TemporaryFilesForAggregation) : nullptr)
|
||||
, min_bytes_for_prefetch(getMinBytesForPrefetch())
|
||||
{
|
||||
/// Use query-level memory tracker
|
||||
@ -1519,10 +1519,10 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
|
||||
Stopwatch watch;
|
||||
size_t rows = data_variants.size();
|
||||
|
||||
auto & out_stream = tmp_data->createStream(getHeader(false), max_temp_file_size);
|
||||
auto & out_stream = tmp_files.emplace_back(getHeader(false), tmp_data.get(), max_temp_file_size);
|
||||
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
|
||||
|
||||
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.getPath());
|
||||
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.getHolder()->describeFilePath());
|
||||
|
||||
/// Flush only two-level data and possibly overflow data.
|
||||
|
||||
@ -1643,7 +1643,7 @@ template <typename Method>
|
||||
void Aggregator::writeToTemporaryFileImpl(
|
||||
AggregatedDataVariants & data_variants,
|
||||
Method & method,
|
||||
TemporaryFileStream & out) const
|
||||
TemporaryBlockStreamHolder & out) const
|
||||
{
|
||||
size_t max_temporary_block_size_rows = 0;
|
||||
size_t max_temporary_block_size_bytes = 0;
|
||||
@ -1660,14 +1660,14 @@ void Aggregator::writeToTemporaryFileImpl(
|
||||
for (UInt32 bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
|
||||
{
|
||||
Block block = convertOneBucketToBlock(data_variants, method, data_variants.aggregates_pool, false, bucket);
|
||||
out.write(block);
|
||||
out->write(block);
|
||||
update_max_sizes(block);
|
||||
}
|
||||
|
||||
if (params.overflow_row)
|
||||
{
|
||||
Block block = prepareBlockAndFillWithoutKey(data_variants, false, true);
|
||||
out.write(block);
|
||||
out->write(block);
|
||||
update_max_sizes(block);
|
||||
}
|
||||
|
||||
|
@ -309,9 +309,9 @@ public:
|
||||
/// For external aggregation.
|
||||
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size = 0) const;
|
||||
|
||||
bool hasTemporaryData() const { return tmp_data && !tmp_data->empty(); }
|
||||
bool hasTemporaryData() const { return !tmp_files.empty(); }
|
||||
|
||||
const TemporaryDataOnDisk & getTemporaryData() const { return *tmp_data; }
|
||||
std::vector<TemporaryBlockStreamHolder> & getTemporaryData() { return tmp_files; }
|
||||
|
||||
/// Get data structure of the result.
|
||||
Block getHeader(bool final) const;
|
||||
@ -355,7 +355,8 @@ private:
|
||||
LoggerPtr log = getLogger("Aggregator");
|
||||
|
||||
/// For external aggregation.
|
||||
TemporaryDataOnDiskPtr tmp_data;
|
||||
TemporaryDataOnDiskScopePtr tmp_data;
|
||||
mutable std::vector<TemporaryBlockStreamHolder> tmp_files;
|
||||
|
||||
size_t min_bytes_for_prefetch = 0;
|
||||
|
||||
@ -456,7 +457,7 @@ private:
|
||||
void writeToTemporaryFileImpl(
|
||||
AggregatedDataVariants & data_variants,
|
||||
Method & method,
|
||||
TemporaryFileStream & out) const;
|
||||
TemporaryBlockStreamHolder & out) const;
|
||||
|
||||
/// Merge NULL key data from hash table `src` into `dst`.
|
||||
template <typename Method, typename Table>
|
||||
|
@ -353,6 +353,8 @@ struct ContextSharedPart : boost::noncopyable
|
||||
/// Child scopes for more fine-grained accounting are created per user/query/etc.
|
||||
/// Initialized once during server startup.
|
||||
TemporaryDataOnDiskScopePtr root_temp_data_on_disk TSA_GUARDED_BY(mutex);
|
||||
/// TODO: remove, use only root_temp_data_on_disk
|
||||
VolumePtr temporary_volume_legacy;
|
||||
|
||||
mutable OnceFlag async_loader_initialized;
|
||||
mutable std::unique_ptr<AsyncLoader> async_loader; /// Thread pool for asynchronous initialization of arbitrary DAG of `LoadJob`s (used for tables loading)
|
||||
@ -783,10 +785,9 @@ struct ContextSharedPart : boost::noncopyable
|
||||
}
|
||||
|
||||
/// Special volumes might also use disks that require shutdown.
|
||||
auto & tmp_data = root_temp_data_on_disk;
|
||||
if (tmp_data && tmp_data->getVolume())
|
||||
if (temporary_volume_legacy)
|
||||
{
|
||||
auto & disks = tmp_data->getVolume()->getDisks();
|
||||
auto & disks = temporary_volume_legacy->getDisks();
|
||||
for (auto & disk : disks)
|
||||
disk->shutdown();
|
||||
}
|
||||
@ -1166,8 +1167,8 @@ VolumePtr Context::getGlobalTemporaryVolume() const
|
||||
SharedLockGuard lock(shared->mutex);
|
||||
/// Calling this method we just bypass the `temp_data_on_disk` and write to the file on the volume directly.
|
||||
/// Volume is the same for `root_temp_data_on_disk` (always set) and `temp_data_on_disk` (if it's set).
|
||||
if (shared->root_temp_data_on_disk)
|
||||
return shared->root_temp_data_on_disk->getVolume();
|
||||
if (shared->temporary_volume_legacy)
|
||||
return shared->temporary_volume_legacy;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -1288,7 +1289,8 @@ void Context::setTemporaryStoragePath(const String & path, size_t max_size)
|
||||
|
||||
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
|
||||
temporary_data_on_disk_settings.max_size_on_disk = max_size;
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), std::move(temporary_data_on_disk_settings));
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, std::move(temporary_data_on_disk_settings));
|
||||
shared->temporary_volume_legacy = volume;
|
||||
}
|
||||
|
||||
void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_size)
|
||||
@ -1336,7 +1338,8 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
|
||||
|
||||
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
|
||||
temporary_data_on_disk_settings.max_size_on_disk = max_size;
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), std::move(temporary_data_on_disk_settings));
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, std::move(temporary_data_on_disk_settings));
|
||||
shared->temporary_volume_legacy = volume;
|
||||
}
|
||||
|
||||
void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size)
|
||||
@ -1360,7 +1363,8 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
|
||||
|
||||
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
|
||||
temporary_data_on_disk_settings.max_size_on_disk = max_size;
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), file_cache.get(), std::move(temporary_data_on_disk_settings));
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(file_cache.get(), std::move(temporary_data_on_disk_settings));
|
||||
shared->temporary_volume_legacy = volume;
|
||||
}
|
||||
|
||||
void Context::setFlagsPath(const String & path)
|
||||
|
@ -41,15 +41,15 @@ namespace
|
||||
class AccumulatedBlockReader
|
||||
{
|
||||
public:
|
||||
AccumulatedBlockReader(TemporaryFileStream & reader_,
|
||||
AccumulatedBlockReader(TemporaryBlockStreamReaderHolder reader_,
|
||||
std::mutex & mutex_,
|
||||
size_t result_block_size_ = 0)
|
||||
: reader(reader_)
|
||||
: reader(std::move(reader_))
|
||||
, mutex(mutex_)
|
||||
, result_block_size(result_block_size_)
|
||||
{
|
||||
if (!reader.isWriteFinished())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Reading not finished file");
|
||||
if (!reader)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Reader is nullptr");
|
||||
}
|
||||
|
||||
Block read()
|
||||
@ -63,7 +63,7 @@ namespace
|
||||
size_t rows_read = 0;
|
||||
do
|
||||
{
|
||||
Block block = reader.read();
|
||||
Block block = reader->read();
|
||||
rows_read += block.rows();
|
||||
if (!block)
|
||||
{
|
||||
@ -81,7 +81,7 @@ namespace
|
||||
}
|
||||
|
||||
private:
|
||||
TemporaryFileStream & reader;
|
||||
TemporaryBlockStreamReaderHolder reader;
|
||||
std::mutex & mutex;
|
||||
|
||||
const size_t result_block_size;
|
||||
@ -124,12 +124,12 @@ class GraceHashJoin::FileBucket : boost::noncopyable
|
||||
public:
|
||||
using BucketLock = std::unique_lock<std::mutex>;
|
||||
|
||||
explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, LoggerPtr log_)
|
||||
: idx{bucket_index_}
|
||||
, left_file{left_file_}
|
||||
, right_file{right_file_}
|
||||
, state{State::WRITING_BLOCKS}
|
||||
, log{log_}
|
||||
explicit FileBucket(size_t bucket_index_, TemporaryBlockStreamHolder left_file_, TemporaryBlockStreamHolder right_file_, LoggerPtr log_)
|
||||
: idx(bucket_index_)
|
||||
, left_file(std::move(left_file_))
|
||||
, right_file(std::move(right_file_))
|
||||
, state(State::WRITING_BLOCKS)
|
||||
, log(log_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -157,12 +157,6 @@ public:
|
||||
return addBlockImpl(block, right_file, lock);
|
||||
}
|
||||
|
||||
bool finished() const
|
||||
{
|
||||
std::unique_lock<std::mutex> left_lock(left_file_mutex);
|
||||
return left_file.isEof();
|
||||
}
|
||||
|
||||
bool empty() const { return is_empty.load(); }
|
||||
|
||||
AccumulatedBlockReader startJoining()
|
||||
@ -172,24 +166,21 @@ public:
|
||||
std::unique_lock<std::mutex> left_lock(left_file_mutex);
|
||||
std::unique_lock<std::mutex> right_lock(right_file_mutex);
|
||||
|
||||
left_file.finishWriting();
|
||||
right_file.finishWriting();
|
||||
|
||||
state = State::JOINING_BLOCKS;
|
||||
}
|
||||
return AccumulatedBlockReader(right_file, right_file_mutex);
|
||||
return AccumulatedBlockReader(right_file.getReadStream(), right_file_mutex);
|
||||
}
|
||||
|
||||
AccumulatedBlockReader getLeftTableReader()
|
||||
{
|
||||
ensureState(State::JOINING_BLOCKS);
|
||||
return AccumulatedBlockReader(left_file, left_file_mutex);
|
||||
return AccumulatedBlockReader(left_file.getReadStream(), left_file_mutex);
|
||||
}
|
||||
|
||||
const size_t idx;
|
||||
|
||||
private:
|
||||
bool addBlockImpl(const Block & block, TemporaryFileStream & writer, std::unique_lock<std::mutex> & lock)
|
||||
bool addBlockImpl(const Block & block, TemporaryBlockStreamHolder & writer, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
ensureState(State::WRITING_BLOCKS);
|
||||
|
||||
@ -199,7 +190,7 @@ private:
|
||||
if (block.rows())
|
||||
is_empty = false;
|
||||
|
||||
writer.write(block);
|
||||
writer->write(block);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -217,8 +208,8 @@ private:
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid state transition, expected {}, got {}", expected, state.load());
|
||||
}
|
||||
|
||||
TemporaryFileStream & left_file;
|
||||
TemporaryFileStream & right_file;
|
||||
TemporaryBlockStreamHolder left_file;
|
||||
TemporaryBlockStreamHolder right_file;
|
||||
mutable std::mutex left_file_mutex;
|
||||
mutable std::mutex right_file_mutex;
|
||||
|
||||
@ -274,7 +265,7 @@ GraceHashJoin::GraceHashJoin(
|
||||
, max_num_buckets{context->getSettingsRef()[Setting::grace_hash_join_max_buckets]}
|
||||
, left_key_names(table_join->getOnlyClause().key_names_left)
|
||||
, right_key_names(table_join->getOnlyClause().key_names_right)
|
||||
, tmp_data(std::make_unique<TemporaryDataOnDisk>(tmp_data_, CurrentMetrics::TemporaryFilesForJoin))
|
||||
, tmp_data(tmp_data_->childScope(CurrentMetrics::TemporaryFilesForJoin))
|
||||
, hash_join(makeInMemoryJoin("grace0"))
|
||||
, hash_join_sample_block(hash_join->savedBlockSample())
|
||||
{
|
||||
@ -398,10 +389,10 @@ void GraceHashJoin::addBuckets(const size_t bucket_count)
|
||||
for (size_t i = 0; i < bucket_count; ++i)
|
||||
try
|
||||
{
|
||||
auto & left_file = tmp_data->createStream(left_sample_block);
|
||||
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
|
||||
TemporaryBlockStreamHolder left_file = TemporaryBlockStreamHolder(left_sample_block, tmp_data.get());
|
||||
TemporaryBlockStreamHolder right_file = TemporaryBlockStreamHolder(prepareRightBlock(right_sample_block), tmp_data.get());
|
||||
|
||||
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, left_file, right_file, log);
|
||||
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, std::move(left_file), std::move(right_file), log);
|
||||
tmp_buckets.emplace_back(std::move(new_bucket));
|
||||
}
|
||||
catch (...)
|
||||
@ -632,12 +623,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
|
||||
for (bucket_idx = bucket_idx + 1; bucket_idx < buckets.size(); ++bucket_idx)
|
||||
{
|
||||
current_bucket = buckets[bucket_idx].get();
|
||||
if (current_bucket->finished() || current_bucket->empty())
|
||||
if (current_bucket->empty())
|
||||
{
|
||||
LOG_TRACE(log, "Skipping {} {} bucket {}",
|
||||
current_bucket->finished() ? "finished" : "",
|
||||
current_bucket->empty() ? "empty" : "",
|
||||
bucket_idx);
|
||||
LOG_TRACE(log, "Skipping empty bucket {}", bucket_idx);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -132,7 +132,7 @@ private:
|
||||
Names left_key_names;
|
||||
Names right_key_names;
|
||||
|
||||
TemporaryDataOnDiskPtr tmp_data;
|
||||
TemporaryDataOnDiskScopePtr tmp_data;
|
||||
|
||||
Buckets buckets;
|
||||
mutable SharedMutex rehash_mutex;
|
||||
|
@ -35,11 +35,6 @@
|
||||
#include <Interpreters/HashJoin/HashJoinMethods.h>
|
||||
#include <Interpreters/HashJoin/JoinUsedFlags.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric TemporaryFilesForJoin;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -64,7 +59,7 @@ struct NotProcessedCrossJoin : public ExtraBlock
|
||||
{
|
||||
size_t left_position;
|
||||
size_t right_block;
|
||||
std::unique_ptr<TemporaryFileStream::Reader> reader;
|
||||
TemporaryBlockStreamReaderHolder reader;
|
||||
};
|
||||
|
||||
|
||||
@ -106,10 +101,7 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
|
||||
, instance_id(instance_id_)
|
||||
, asof_inequality(table_join->getAsofInequality())
|
||||
, data(std::make_shared<RightTableData>())
|
||||
, tmp_data(
|
||||
table_join_->getTempDataOnDisk()
|
||||
? std::make_unique<TemporaryDataOnDisk>(table_join_->getTempDataOnDisk(), CurrentMetrics::TemporaryFilesForJoin)
|
||||
: nullptr)
|
||||
, tmp_data(table_join_->getTempDataOnDisk())
|
||||
, right_sample_block(right_sample_block_)
|
||||
, max_joined_block_rows(table_join->maxJoinedBlockRows())
|
||||
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
|
||||
@ -520,10 +512,9 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
||||
&& (tmp_stream || (max_bytes_in_join && getTotalByteCount() + block_to_save.allocatedBytes() >= max_bytes_in_join)
|
||||
|| (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join)))
|
||||
{
|
||||
if (tmp_stream == nullptr)
|
||||
{
|
||||
tmp_stream = &tmp_data->createStream(right_sample_block);
|
||||
}
|
||||
if (!tmp_stream)
|
||||
tmp_stream = TemporaryBlockStreamHolder(right_sample_block, tmp_data.get());
|
||||
|
||||
tmp_stream->write(block_to_save);
|
||||
return true;
|
||||
}
|
||||
@ -730,7 +721,7 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
|
||||
{
|
||||
size_t start_left_row = 0;
|
||||
size_t start_right_block = 0;
|
||||
std::unique_ptr<TemporaryFileStream::Reader> reader = nullptr;
|
||||
TemporaryBlockStreamReaderHolder reader;
|
||||
if (not_processed)
|
||||
{
|
||||
auto & continuation = static_cast<NotProcessedCrossJoin &>(*not_processed);
|
||||
@ -804,11 +795,9 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
|
||||
|
||||
if (tmp_stream && rows_added <= max_joined_block_rows)
|
||||
{
|
||||
if (reader == nullptr)
|
||||
{
|
||||
tmp_stream->finishWritingAsyncSafe();
|
||||
reader = tmp_stream->getReadStream();
|
||||
}
|
||||
if (!reader)
|
||||
reader = tmp_stream.getReadStream();
|
||||
|
||||
while (auto block_right = reader->read())
|
||||
{
|
||||
++block_number;
|
||||
|
@ -423,8 +423,9 @@ private:
|
||||
std::vector<Sizes> key_sizes;
|
||||
|
||||
/// Needed to do external cross join
|
||||
TemporaryDataOnDiskPtr tmp_data;
|
||||
TemporaryFileStream* tmp_stream{nullptr};
|
||||
TemporaryDataOnDiskScopePtr tmp_data;
|
||||
TemporaryBlockStreamHolder tmp_stream;
|
||||
mutable std::once_flag finish_writing;
|
||||
|
||||
/// Block with columns from the right-side table.
|
||||
Block right_sample_block;
|
||||
|
@ -20,6 +20,11 @@
|
||||
#include <memory>
|
||||
#include <base/types.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric TemporaryFilesForJoin;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -265,7 +270,7 @@ public:
|
||||
|
||||
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }
|
||||
|
||||
TemporaryDataOnDiskScopePtr getTempDataOnDisk() { return tmp_data; }
|
||||
TemporaryDataOnDiskScopePtr getTempDataOnDisk() { return tmp_data ? tmp_data->childScope(CurrentMetrics::TemporaryFilesForJoin) : nullptr; }
|
||||
|
||||
ActionsDAG createJoinedBlockActions(ContextPtr context) const;
|
||||
|
||||
|
@ -27,11 +27,266 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TOO_MANY_ROWS_OR_BYTES;
|
||||
extern const int INVALID_STATE;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int NOT_ENOUGH_SPACE;
|
||||
extern const int TOO_MANY_ROWS_OR_BYTES;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
inline CompressionCodecPtr getCodec(const TemporaryDataOnDiskSettings & settings)
|
||||
{
|
||||
if (settings.compression_codec.empty())
|
||||
return CompressionCodecFactory::instance().get("NONE");
|
||||
|
||||
return CompressionCodecFactory::instance().get(settings.compression_codec);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TemporaryFileHolder::TemporaryFileHolder()
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
|
||||
}
|
||||
|
||||
|
||||
class TemporaryFileInLocalCache : public TemporaryFileHolder
|
||||
{
|
||||
public:
|
||||
explicit TemporaryFileInLocalCache(FileCache & file_cache, size_t max_file_size = 0)
|
||||
{
|
||||
const auto key = FileSegment::Key::random();
|
||||
segment_holder = file_cache.set(
|
||||
key, 0, std::max(10_MiB, max_file_size),
|
||||
CreateFileSegmentSettings(FileSegmentKind::Ephemeral), FileCache::getCommonUser());
|
||||
|
||||
chassert(segment_holder->size() == 1);
|
||||
segment_holder->front().getKeyMetadata()->createBaseDirectory(/* throw_if_failed */true);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBuffer> write() override
|
||||
{
|
||||
return std::make_unique<WriteBufferToFileSegment>(&segment_holder->front());
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> read(size_t buffer_size) const override
|
||||
{
|
||||
return std::make_unique<ReadBufferFromFile>(segment_holder->front().getPath(), /* buf_size = */ buffer_size);
|
||||
}
|
||||
|
||||
String describeFilePath() const override
|
||||
{
|
||||
return fmt::format("fscache://{}", segment_holder->front().getPath());
|
||||
}
|
||||
|
||||
private:
|
||||
FileSegmentsHolderPtr segment_holder;
|
||||
};
|
||||
|
||||
class TemporaryFileOnLocalDisk : public TemporaryFileHolder
|
||||
{
|
||||
public:
|
||||
explicit TemporaryFileOnLocalDisk(VolumePtr volume, size_t max_file_size = 0)
|
||||
: path_to_file("tmp" + toString(UUIDHelpers::generateV4()))
|
||||
{
|
||||
if (max_file_size > 0)
|
||||
{
|
||||
auto reservation = volume->reserve(max_file_size);
|
||||
if (!reservation)
|
||||
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space on temporary disk");
|
||||
disk = reservation->getDisk();
|
||||
}
|
||||
else
|
||||
{
|
||||
disk = volume->getDisk();
|
||||
}
|
||||
chassert(disk);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBuffer> write() override
|
||||
{
|
||||
return disk->writeFile(path_to_file);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> read(size_t buffer_size) const override
|
||||
{
|
||||
ReadSettings settings;
|
||||
settings.local_fs_buffer_size = buffer_size;
|
||||
settings.remote_fs_buffer_size = buffer_size;
|
||||
settings.prefetch_buffer_size = buffer_size;
|
||||
|
||||
return disk->readFile(path_to_file, settings);
|
||||
}
|
||||
|
||||
String describeFilePath() const override
|
||||
{
|
||||
return fmt::format("disk({})://{}/{}", disk->getName(), disk->getPath(), path_to_file);
|
||||
}
|
||||
|
||||
~TemporaryFileOnLocalDisk() override
|
||||
try
|
||||
{
|
||||
if (disk->exists(path_to_file))
|
||||
disk->removeRecursive(path_to_file);
|
||||
else
|
||||
LOG_WARNING(getLogger("TemporaryFileOnLocalDisk"), "Temporary path '{}' does not exist in '{}' on disk {}", path_to_file, disk->getPath(), disk->getName());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
private:
|
||||
DiskPtr disk;
|
||||
String path_to_file;
|
||||
};
|
||||
|
||||
TemporaryFileProvider createTemporaryFileProvider(VolumePtr volume)
|
||||
{
|
||||
if (!volume)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Volume is not initialized");
|
||||
return [volume](size_t max_size) -> std::unique_ptr<TemporaryFileHolder>
|
||||
{
|
||||
return std::make_unique<TemporaryFileOnLocalDisk>(volume, max_size);
|
||||
};
|
||||
}
|
||||
|
||||
TemporaryFileProvider createTemporaryFileProvider(FileCache * file_cache)
|
||||
{
|
||||
if (!file_cache || !file_cache->isInitialized())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "File cache is not initialized");
|
||||
return [file_cache](size_t max_size) -> std::unique_ptr<TemporaryFileHolder>
|
||||
{
|
||||
return std::make_unique<TemporaryFileInLocalCache>(*file_cache, max_size);
|
||||
};
|
||||
}
|
||||
|
||||
TemporaryDataOnDiskScopePtr TemporaryDataOnDiskScope::childScope(CurrentMetrics::Metric current_metric)
|
||||
{
|
||||
TemporaryDataOnDiskSettings child_settings = settings;
|
||||
child_settings.current_metric = current_metric;
|
||||
return std::make_shared<TemporaryDataOnDiskScope>(shared_from_this(), child_settings);
|
||||
}
|
||||
|
||||
TemporaryDataReadBuffer::TemporaryDataReadBuffer(std::unique_ptr<ReadBuffer> in_)
|
||||
: ReadBuffer(nullptr, 0)
|
||||
, compressed_buf(std::move(in_))
|
||||
{
|
||||
BufferBase::set(compressed_buf->buffer().begin(), compressed_buf->buffer().size(), compressed_buf->offset());
|
||||
}
|
||||
|
||||
bool TemporaryDataReadBuffer::nextImpl()
|
||||
{
|
||||
compressed_buf->position() = position();
|
||||
if (!compressed_buf->next())
|
||||
{
|
||||
set(compressed_buf->position(), 0);
|
||||
return false;
|
||||
}
|
||||
BufferBase::set(compressed_buf->buffer().begin(), compressed_buf->buffer().size(), compressed_buf->offset());
|
||||
return true;
|
||||
}
|
||||
|
||||
TemporaryDataBuffer::TemporaryDataBuffer(TemporaryDataOnDiskScope * parent_, size_t max_file_size)
|
||||
: WriteBuffer(nullptr, 0)
|
||||
, parent(parent_)
|
||||
, file_holder(parent->file_provider(max_file_size == 0 ? parent->getSettings().max_size_on_disk : max_file_size))
|
||||
, out_compressed_buf(file_holder->write(), getCodec(parent->getSettings()))
|
||||
{
|
||||
WriteBuffer::set(out_compressed_buf->buffer().begin(), out_compressed_buf->buffer().size());
|
||||
}
|
||||
|
||||
void TemporaryDataBuffer::nextImpl()
|
||||
{
|
||||
if (!out_compressed_buf)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file buffer writing has been finished");
|
||||
|
||||
out_compressed_buf->position() = position();
|
||||
out_compressed_buf->next();
|
||||
BufferBase::set(out_compressed_buf->buffer().begin(), out_compressed_buf->buffer().size(), out_compressed_buf->offset());
|
||||
updateAllocAndCheck();
|
||||
}
|
||||
|
||||
String TemporaryDataBuffer::describeFilePath() const
|
||||
{
|
||||
return file_holder->describeFilePath();
|
||||
}
|
||||
|
||||
TemporaryDataBuffer::~TemporaryDataBuffer()
|
||||
{
|
||||
if (out_compressed_buf)
|
||||
// read() nor finishWriting() was called
|
||||
cancel();
|
||||
}
|
||||
|
||||
void TemporaryDataBuffer::cancelImpl() noexcept
|
||||
{
|
||||
if (out_compressed_buf)
|
||||
{
|
||||
/// CompressedWriteBuffer doesn't call cancel/finalize for wrapped buffer
|
||||
out_compressed_buf->cancel();
|
||||
out_compressed_buf.getHolder()->cancel();
|
||||
out_compressed_buf.reset();
|
||||
}
|
||||
}
|
||||
|
||||
void TemporaryDataBuffer::finalizeImpl()
|
||||
{
|
||||
if (!out_compressed_buf)
|
||||
return;
|
||||
|
||||
/// CompressedWriteBuffer doesn't call cancel/finalize for wrapped buffer
|
||||
out_compressed_buf->finalize();
|
||||
out_compressed_buf.getHolder()->finalize();
|
||||
|
||||
updateAllocAndCheck();
|
||||
out_compressed_buf.reset();
|
||||
}
|
||||
|
||||
TemporaryDataBuffer::Stat TemporaryDataBuffer::finishWriting()
|
||||
{
|
||||
/// TemporaryDataBuffer::read can be called from multiple threads
|
||||
std::call_once(write_finished, [this]
|
||||
{
|
||||
if (canceled)
|
||||
throw Exception(ErrorCodes::INVALID_STATE, "Writing to temporary file buffer was not successful");
|
||||
next();
|
||||
finalize();
|
||||
});
|
||||
return stat;
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> TemporaryDataBuffer::read()
|
||||
{
|
||||
finishWriting();
|
||||
|
||||
/// Keep buffer size less that file size, to avoid memory overhead for large amounts of small files
|
||||
size_t buffer_size = std::min<size_t>(stat.compressed_size, DBMS_DEFAULT_BUFFER_SIZE);
|
||||
return std::make_unique<TemporaryDataReadBuffer>(file_holder->read(buffer_size));
|
||||
}
|
||||
|
||||
void TemporaryDataBuffer::updateAllocAndCheck()
|
||||
{
|
||||
if (!out_compressed_buf)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file buffer writing has been finished");
|
||||
|
||||
size_t new_compressed_size = out_compressed_buf->getCompressedBytes();
|
||||
size_t new_uncompressed_size = out_compressed_buf->getUncompressedBytes();
|
||||
|
||||
if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size))
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
|
||||
file_holder ? file_holder->describeFilePath() : "NULL",
|
||||
new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
|
||||
}
|
||||
|
||||
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
|
||||
stat.compressed_size = new_compressed_size;
|
||||
stat.uncompressed_size = new_uncompressed_size;
|
||||
}
|
||||
|
||||
void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta)
|
||||
{
|
||||
@ -54,391 +309,25 @@ void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssiz
|
||||
stat.uncompressed_size += uncompressed_delta;
|
||||
}
|
||||
|
||||
TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_)
|
||||
: TemporaryDataOnDiskScope(parent_, parent_->getSettings())
|
||||
TemporaryBlockStreamHolder::TemporaryBlockStreamHolder(const Block & header_, TemporaryDataOnDiskScope * parent_, size_t max_file_size)
|
||||
: WrapperGuard(std::make_unique<TemporaryDataBuffer>(parent_, max_file_size), DBMS_TCP_PROTOCOL_VERSION, header_)
|
||||
, header(header_)
|
||||
{}
|
||||
|
||||
TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope)
|
||||
: TemporaryDataOnDiskScope(parent_, parent_->getSettings())
|
||||
, current_metric_scope(metric_scope)
|
||||
{}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> TemporaryDataOnDisk::createRawStream(size_t max_file_size)
|
||||
TemporaryDataBuffer::Stat TemporaryBlockStreamHolder::finishWriting() const
|
||||
{
|
||||
if (file_cache && file_cache->isInitialized())
|
||||
{
|
||||
auto holder = createCacheFile(max_file_size);
|
||||
return std::make_unique<WriteBufferToFileSegment>(std::move(holder));
|
||||
}
|
||||
if (volume)
|
||||
{
|
||||
auto tmp_file = createRegularFile(max_file_size);
|
||||
return std::make_unique<WriteBufferFromTemporaryFile>(std::move(tmp_file));
|
||||
}
|
||||
if (!holder)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary block stream is not initialized");
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache and no volume");
|
||||
impl->flush();
|
||||
return holder->finishWriting();
|
||||
}
|
||||
|
||||
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
|
||||
TemporaryBlockStreamReaderHolder TemporaryBlockStreamHolder::getReadStream() const
|
||||
{
|
||||
if (file_cache && file_cache->isInitialized())
|
||||
{
|
||||
auto holder = createCacheFile(max_file_size);
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(holder), header, this));
|
||||
return *tmp_stream;
|
||||
}
|
||||
if (volume)
|
||||
{
|
||||
auto tmp_file = createRegularFile(max_file_size);
|
||||
std::lock_guard lock(mutex);
|
||||
TemporaryFileStreamPtr & tmp_stream
|
||||
= streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
|
||||
return *tmp_stream;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache and no volume");
|
||||
}
|
||||
|
||||
FileSegmentsHolderPtr TemporaryDataOnDisk::createCacheFile(size_t max_file_size)
|
||||
{
|
||||
if (!file_cache)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache");
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
|
||||
|
||||
const auto key = FileSegment::Key::random();
|
||||
auto holder = file_cache->set(
|
||||
key, 0, std::max(10_MiB, max_file_size),
|
||||
CreateFileSegmentSettings(FileSegmentKind::Ephemeral), FileCache::getCommonUser());
|
||||
|
||||
chassert(holder->size() == 1);
|
||||
holder->back().getKeyMetadata()->createBaseDirectory(/* throw_if_failed */true);
|
||||
|
||||
return holder;
|
||||
}
|
||||
|
||||
TemporaryFileOnDiskHolder TemporaryDataOnDisk::createRegularFile(size_t max_file_size)
|
||||
{
|
||||
if (!volume)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no volume");
|
||||
|
||||
DiskPtr disk;
|
||||
if (max_file_size > 0)
|
||||
{
|
||||
auto reservation = volume->reserve(max_file_size);
|
||||
if (!reservation)
|
||||
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space on temporary disk");
|
||||
disk = reservation->getDisk();
|
||||
}
|
||||
else
|
||||
{
|
||||
disk = volume->getDisk();
|
||||
}
|
||||
/// We do not increment ProfileEvents::ExternalProcessingFilesTotal here because it is incremented in TemporaryFileOnDisk constructor.
|
||||
return std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
|
||||
}
|
||||
|
||||
std::vector<TemporaryFileStream *> TemporaryDataOnDisk::getStreams() const
|
||||
{
|
||||
std::vector<TemporaryFileStream *> res;
|
||||
std::lock_guard lock(mutex);
|
||||
res.reserve(streams.size());
|
||||
for (const auto & stream : streams)
|
||||
res.push_back(stream.get());
|
||||
return res;
|
||||
}
|
||||
|
||||
bool TemporaryDataOnDisk::empty() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return streams.empty();
|
||||
}
|
||||
|
||||
static inline CompressionCodecPtr getCodec(const TemporaryDataOnDiskSettings & settings)
|
||||
{
|
||||
if (settings.compression_codec.empty())
|
||||
return CompressionCodecFactory::instance().get("NONE");
|
||||
|
||||
return CompressionCodecFactory::instance().get(settings.compression_codec);
|
||||
}
|
||||
|
||||
struct TemporaryFileStream::OutputWriter
|
||||
{
|
||||
OutputWriter(std::unique_ptr<WriteBuffer> out_buf_, const Block & header_, const TemporaryDataOnDiskSettings & settings)
|
||||
: out_buf(std::move(out_buf_))
|
||||
, out_compressed_buf(*out_buf, getCodec(settings))
|
||||
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
|
||||
{
|
||||
}
|
||||
|
||||
size_t write(const Block & block)
|
||||
{
|
||||
if (finalized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized stream");
|
||||
size_t written_bytes = out_writer.write(block);
|
||||
num_rows += block.rows();
|
||||
return written_bytes;
|
||||
}
|
||||
|
||||
void flush()
|
||||
{
|
||||
if (finalized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot flush finalized stream");
|
||||
|
||||
out_compressed_buf.next();
|
||||
out_buf->next();
|
||||
out_writer.flush();
|
||||
}
|
||||
|
||||
void finalize()
|
||||
{
|
||||
if (finalized)
|
||||
return;
|
||||
|
||||
/// if we called finalize() explicitly, and got an exception,
|
||||
/// we don't want to get it again in the destructor, so set finalized flag first
|
||||
finalized = true;
|
||||
|
||||
out_writer.flush();
|
||||
out_compressed_buf.finalize();
|
||||
out_buf->finalize();
|
||||
}
|
||||
|
||||
~OutputWriter()
|
||||
{
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBuffer> out_buf;
|
||||
CompressedWriteBuffer out_compressed_buf;
|
||||
NativeWriter out_writer;
|
||||
|
||||
std::atomic_size_t num_rows = 0;
|
||||
|
||||
bool finalized = false;
|
||||
};
|
||||
|
||||
TemporaryFileStream::Reader::Reader(const String & path_, const Block & header_, size_t size_)
|
||||
: path(path_)
|
||||
, size(size_ ? std::min<size_t>(size_, DBMS_DEFAULT_BUFFER_SIZE) : DBMS_DEFAULT_BUFFER_SIZE)
|
||||
, header(header_)
|
||||
{
|
||||
LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
|
||||
}
|
||||
|
||||
TemporaryFileStream::Reader::Reader(const String & path_, size_t size_)
|
||||
: path(path_)
|
||||
, size(size_ ? std::min<size_t>(size_, DBMS_DEFAULT_BUFFER_SIZE) : DBMS_DEFAULT_BUFFER_SIZE)
|
||||
{
|
||||
LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
|
||||
}
|
||||
|
||||
Block TemporaryFileStream::Reader::read()
|
||||
{
|
||||
if (!in_reader)
|
||||
{
|
||||
if (fs::exists(path))
|
||||
in_file_buf = std::make_unique<ReadBufferFromFile>(path, size);
|
||||
else
|
||||
in_file_buf = std::make_unique<ReadBufferFromEmptyFile>();
|
||||
|
||||
in_compressed_buf = std::make_unique<CompressedReadBuffer>(*in_file_buf);
|
||||
if (header.has_value())
|
||||
in_reader = std::make_unique<NativeReader>(*in_compressed_buf, header.value(), DBMS_TCP_PROTOCOL_VERSION);
|
||||
else
|
||||
in_reader = std::make_unique<NativeReader>(*in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION);
|
||||
}
|
||||
return in_reader->read();
|
||||
}
|
||||
|
||||
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
|
||||
: parent(parent_)
|
||||
, header(header_)
|
||||
, file(std::move(file_))
|
||||
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getAbsolutePath()), header, parent->settings))
|
||||
{
|
||||
LOG_TEST(getLogger("TemporaryFileStream"), "Writing to temporary file {}", file->getAbsolutePath());
|
||||
}
|
||||
|
||||
TemporaryFileStream::TemporaryFileStream(FileSegmentsHolderPtr segments_, const Block & header_, TemporaryDataOnDisk * parent_)
|
||||
: parent(parent_)
|
||||
, header(header_)
|
||||
, segment_holder(std::move(segments_))
|
||||
{
|
||||
if (segment_holder->size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream can be created only from single segment");
|
||||
auto out_buf = std::make_unique<WriteBufferToFileSegment>(&segment_holder->front());
|
||||
|
||||
LOG_TEST(getLogger("TemporaryFileStream"), "Writing to temporary file {}", out_buf->getFileName());
|
||||
out_writer = std::make_unique<OutputWriter>(std::move(out_buf), header, parent_->settings);
|
||||
}
|
||||
|
||||
size_t TemporaryFileStream::write(const Block & block)
|
||||
{
|
||||
if (!out_writer)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been finished");
|
||||
|
||||
updateAllocAndCheck();
|
||||
size_t bytes_written = out_writer->write(block);
|
||||
return bytes_written;
|
||||
}
|
||||
|
||||
void TemporaryFileStream::flush()
|
||||
{
|
||||
if (!out_writer)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been finished");
|
||||
|
||||
out_writer->flush();
|
||||
}
|
||||
|
||||
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
|
||||
{
|
||||
if (isWriteFinished())
|
||||
return stat;
|
||||
|
||||
if (out_writer)
|
||||
{
|
||||
out_writer->finalize();
|
||||
/// The amount of written data can be changed after finalization, some buffers can be flushed
|
||||
/// Need to update the stat
|
||||
updateAllocAndCheck();
|
||||
out_writer.reset();
|
||||
|
||||
/// reader will be created at the first read call, not to consume memory before it is needed
|
||||
}
|
||||
return stat;
|
||||
}
|
||||
|
||||
TemporaryFileStream::Stat TemporaryFileStream::finishWritingAsyncSafe()
|
||||
{
|
||||
std::call_once(finish_writing, [this]{ finishWriting(); });
|
||||
return stat;
|
||||
}
|
||||
|
||||
bool TemporaryFileStream::isWriteFinished() const
|
||||
{
|
||||
assert(in_reader == nullptr || out_writer == nullptr);
|
||||
return out_writer == nullptr;
|
||||
}
|
||||
|
||||
Block TemporaryFileStream::read()
|
||||
{
|
||||
if (!isWriteFinished())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
|
||||
|
||||
if (isEof())
|
||||
return {};
|
||||
|
||||
if (!in_reader)
|
||||
{
|
||||
in_reader = std::make_unique<Reader>(getPath(), header, getSize());
|
||||
}
|
||||
|
||||
Block block = in_reader->read();
|
||||
if (!block)
|
||||
{
|
||||
/// finalize earlier to release resources, do not wait for the destructor
|
||||
this->release();
|
||||
}
|
||||
return block;
|
||||
}
|
||||
|
||||
std::unique_ptr<TemporaryFileStream::Reader> TemporaryFileStream::getReadStream()
|
||||
{
|
||||
if (!isWriteFinished())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
|
||||
|
||||
if (isEof())
|
||||
return nullptr;
|
||||
|
||||
return std::make_unique<Reader>(getPath(), header, getSize());
|
||||
}
|
||||
|
||||
void TemporaryFileStream::updateAllocAndCheck()
|
||||
{
|
||||
assert(out_writer);
|
||||
size_t new_compressed_size = out_writer->out_compressed_buf.getCompressedBytes();
|
||||
size_t new_uncompressed_size = out_writer->out_compressed_buf.getUncompressedBytes();
|
||||
|
||||
if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size))
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
|
||||
getPath(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
|
||||
}
|
||||
|
||||
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
|
||||
stat.compressed_size = new_compressed_size;
|
||||
stat.uncompressed_size = new_uncompressed_size;
|
||||
stat.num_rows = out_writer->num_rows;
|
||||
}
|
||||
|
||||
bool TemporaryFileStream::isEof() const
|
||||
{
|
||||
return file == nullptr && !segment_holder;
|
||||
}
|
||||
|
||||
void TemporaryFileStream::release()
|
||||
{
|
||||
if (in_reader)
|
||||
in_reader.reset();
|
||||
|
||||
if (out_writer)
|
||||
{
|
||||
out_writer->finalize();
|
||||
out_writer.reset();
|
||||
}
|
||||
|
||||
if (file)
|
||||
{
|
||||
file.reset();
|
||||
parent->deltaAllocAndCheck(-stat.compressed_size, -stat.uncompressed_size);
|
||||
}
|
||||
|
||||
if (segment_holder)
|
||||
segment_holder.reset();
|
||||
}
|
||||
|
||||
String TemporaryFileStream::getPath() const
|
||||
{
|
||||
if (file)
|
||||
return file->getAbsolutePath();
|
||||
if (segment_holder && !segment_holder->empty())
|
||||
return segment_holder->front().getPath();
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream has no file");
|
||||
}
|
||||
|
||||
size_t TemporaryFileStream::getSize() const
|
||||
{
|
||||
if (file)
|
||||
return file->getDisk()->getFileSize(file->getRelativePath());
|
||||
if (segment_holder && !segment_holder->empty())
|
||||
return segment_holder->front().getReservedSize();
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream has no file");
|
||||
}
|
||||
|
||||
TemporaryFileStream::~TemporaryFileStream()
|
||||
{
|
||||
try
|
||||
{
|
||||
release();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
assert(false); /// deltaAllocAndCheck with negative can't throw exception
|
||||
}
|
||||
if (!holder)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary block stream is not initialized");
|
||||
return TemporaryBlockStreamReaderHolder(holder->read(), header, DBMS_TCP_PROTOCOL_VERSION);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,15 +4,21 @@
|
||||
#include <mutex>
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Formats/NativeReader.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
|
||||
#include <Disks/IVolume.h>
|
||||
#include <Disks/TemporaryFileOnDisk.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
#include <Formats/NativeReader.h>
|
||||
#include <Formats/NativeWriter.h>
|
||||
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
|
||||
class FileCacheTest_TemporaryDataReadBufferSize_Test;
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
@ -25,11 +31,10 @@ namespace DB
|
||||
class TemporaryDataOnDiskScope;
|
||||
using TemporaryDataOnDiskScopePtr = std::shared_ptr<TemporaryDataOnDiskScope>;
|
||||
|
||||
class TemporaryDataOnDisk;
|
||||
using TemporaryDataOnDiskPtr = std::unique_ptr<TemporaryDataOnDisk>;
|
||||
class TemporaryDataBuffer;
|
||||
using TemporaryDataBufferPtr = std::unique_ptr<TemporaryDataBuffer>;
|
||||
|
||||
class TemporaryFileStream;
|
||||
using TemporaryFileStreamPtr = std::unique_ptr<TemporaryFileStream>;
|
||||
class TemporaryFileHolder;
|
||||
|
||||
class FileCache;
|
||||
|
||||
@ -40,15 +45,26 @@ struct TemporaryDataOnDiskSettings
|
||||
|
||||
/// Compression codec for temporary data, if empty no compression will be used. LZ4 by default
|
||||
String compression_codec = "LZ4";
|
||||
|
||||
/// Read/Write internal buffer size
|
||||
size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
|
||||
/// Metrics counter to increment when temporary file in current scope are created
|
||||
CurrentMetrics::Metric current_metric = CurrentMetrics::TemporaryFilesUnknown;
|
||||
};
|
||||
|
||||
/// Creates temporary files located on specified resource (disk, fs_cache, etc.)
|
||||
using TemporaryFileProvider = std::function<std::unique_ptr<TemporaryFileHolder>(size_t)>;
|
||||
TemporaryFileProvider createTemporaryFileProvider(VolumePtr volume);
|
||||
TemporaryFileProvider createTemporaryFileProvider(FileCache * file_cache);
|
||||
|
||||
/*
|
||||
* Used to account amount of temporary data written to disk.
|
||||
* If limit is set, throws exception if limit is exceeded.
|
||||
* Data can be nested, so parent scope accounts all data written by children.
|
||||
* Scopes are: global -> per-user -> per-query -> per-purpose (sorting, aggregation, etc).
|
||||
*/
|
||||
class TemporaryDataOnDiskScope : boost::noncopyable
|
||||
class TemporaryDataOnDiskScope : boost::noncopyable, public std::enable_shared_from_this<TemporaryDataOnDiskScope>
|
||||
{
|
||||
public:
|
||||
struct StatAtomic
|
||||
@ -57,164 +73,155 @@ public:
|
||||
std::atomic<size_t> uncompressed_size;
|
||||
};
|
||||
|
||||
explicit TemporaryDataOnDiskScope(VolumePtr volume_, TemporaryDataOnDiskSettings settings_)
|
||||
: volume(std::move(volume_))
|
||||
/// Root scope
|
||||
template <typename T>
|
||||
TemporaryDataOnDiskScope(T && storage, TemporaryDataOnDiskSettings settings_)
|
||||
: file_provider(createTemporaryFileProvider(std::forward<T>(storage)))
|
||||
, settings(std::move(settings_))
|
||||
{}
|
||||
|
||||
explicit TemporaryDataOnDiskScope(VolumePtr volume_, FileCache * file_cache_, TemporaryDataOnDiskSettings settings_)
|
||||
: volume(std::move(volume_))
|
||||
, file_cache(file_cache_)
|
||||
, settings(std::move(settings_))
|
||||
{}
|
||||
|
||||
explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, TemporaryDataOnDiskSettings settings_)
|
||||
TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, TemporaryDataOnDiskSettings settings_)
|
||||
: parent(std::move(parent_))
|
||||
, volume(parent->volume)
|
||||
, file_cache(parent->file_cache)
|
||||
, file_provider(parent->file_provider)
|
||||
, settings(std::move(settings_))
|
||||
{}
|
||||
|
||||
/// TODO: remove
|
||||
/// Refactor all code that uses volume directly to use TemporaryDataOnDisk.
|
||||
VolumePtr getVolume() const { return volume; }
|
||||
TemporaryDataOnDiskScopePtr childScope(CurrentMetrics::Metric current_metric);
|
||||
|
||||
const TemporaryDataOnDiskSettings & getSettings() const { return settings; }
|
||||
|
||||
protected:
|
||||
friend class TemporaryDataBuffer;
|
||||
|
||||
void deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta);
|
||||
|
||||
TemporaryDataOnDiskScopePtr parent = nullptr;
|
||||
|
||||
VolumePtr volume = nullptr;
|
||||
FileCache * file_cache = nullptr;
|
||||
TemporaryFileProvider file_provider;
|
||||
|
||||
StatAtomic stat;
|
||||
const TemporaryDataOnDiskSettings settings;
|
||||
};
|
||||
|
||||
/*
|
||||
* Holds the set of temporary files.
|
||||
* New file stream is created with `createStream`.
|
||||
* Streams are owned by this object and will be deleted when it is deleted.
|
||||
* It's a leaf node in temporary data scope tree.
|
||||
*/
|
||||
class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
|
||||
/** Used to hold the wrapper and wrapped object together.
|
||||
* This class provides a convenient way to manage the lifetime of both the wrapper and the wrapped object.
|
||||
* The wrapper class (Impl) stores a reference to the wrapped object (Holder), and both objects are owned by this class.
|
||||
* The lifetime of the wrapper and the wrapped object should be the same.
|
||||
* This pattern is commonly used when the caller only needs to interact with the wrapper and doesn't need to be aware of the wrapped object.
|
||||
* Examples: CompressedWriteBuffer and WriteBuffer, and NativeReader and ReadBuffer.
|
||||
*/
|
||||
template <typename Impl, typename Holder>
|
||||
class WrapperGuard
|
||||
{
|
||||
friend class TemporaryFileStream; /// to allow it to call `deltaAllocAndCheck` to account data
|
||||
|
||||
public:
|
||||
using TemporaryDataOnDiskScope::StatAtomic;
|
||||
WrapperGuard() = default;
|
||||
|
||||
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_);
|
||||
template <typename ... Args>
|
||||
WrapperGuard(std::unique_ptr<Holder> holder_, Args && ... args)
|
||||
: holder(std::move(holder_))
|
||||
, impl(std::make_unique<Impl>(*holder, std::forward<Args>(args)...))
|
||||
{}
|
||||
|
||||
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope);
|
||||
Impl * operator->() { return impl.get(); }
|
||||
const Impl * operator->() const { return impl.get(); }
|
||||
Impl & operator*() { return *impl; }
|
||||
const Impl & operator*() const { return *impl; }
|
||||
operator bool() const { return impl != nullptr; }
|
||||
|
||||
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
|
||||
TemporaryFileStream & createStream(const Block & header, size_t max_file_size = 0);
|
||||
const Holder * getHolder() const { return holder.get(); }
|
||||
Holder * getHolder() { return holder.get(); }
|
||||
|
||||
/// Write raw data directly into buffer.
|
||||
/// Differences from `createStream`:
|
||||
/// 1) it doesn't account data in parent scope
|
||||
/// 2) returned buffer owns resources (instead of TemporaryDataOnDisk itself)
|
||||
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
|
||||
std::unique_ptr<WriteBufferFromFileBase> createRawStream(size_t max_file_size = 0);
|
||||
void reset()
|
||||
{
|
||||
impl.reset();
|
||||
holder.reset();
|
||||
}
|
||||
|
||||
std::vector<TemporaryFileStream *> getStreams() const;
|
||||
bool empty() const;
|
||||
|
||||
const StatAtomic & getStat() const { return stat; }
|
||||
|
||||
private:
|
||||
FileSegmentsHolderPtr createCacheFile(size_t max_file_size);
|
||||
TemporaryFileOnDiskHolder createRegularFile(size_t max_file_size);
|
||||
|
||||
mutable std::mutex mutex;
|
||||
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);
|
||||
|
||||
typename CurrentMetrics::Metric current_metric_scope = CurrentMetrics::TemporaryFilesUnknown;
|
||||
protected:
|
||||
std::unique_ptr<Holder> holder;
|
||||
std::unique_ptr<Impl> impl;
|
||||
};
|
||||
|
||||
/*
|
||||
* Data can be written into this stream and then read.
|
||||
* After finish writing, call `finishWriting` and then either call `read` or 'getReadStream'(only one of the two) to read the data.
|
||||
* Account amount of data written to disk in parent scope.
|
||||
*/
|
||||
class TemporaryFileStream : boost::noncopyable
|
||||
/// Owns temporary file and provides access to it.
|
||||
/// On destruction, file is removed and all resources are freed.
|
||||
/// Lifetime of read/write buffers should be less than lifetime of TemporaryFileHolder.
|
||||
class TemporaryFileHolder
|
||||
{
|
||||
public:
|
||||
struct Reader
|
||||
{
|
||||
Reader(const String & path, const Block & header_, size_t size = 0);
|
||||
TemporaryFileHolder();
|
||||
|
||||
explicit Reader(const String & path, size_t size = 0);
|
||||
virtual std::unique_ptr<WriteBuffer> write() = 0;
|
||||
virtual std::unique_ptr<ReadBuffer> read(size_t buffer_size) const = 0;
|
||||
|
||||
Block read();
|
||||
/// Get location for logging purposes
|
||||
virtual String describeFilePath() const = 0;
|
||||
|
||||
const std::string path;
|
||||
const size_t size;
|
||||
const std::optional<Block> header;
|
||||
virtual ~TemporaryFileHolder() = default;
|
||||
};
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> in_file_buf;
|
||||
std::unique_ptr<CompressedReadBuffer> in_compressed_buf;
|
||||
std::unique_ptr<NativeReader> in_reader;
|
||||
};
|
||||
|
||||
class TemporaryDataReadBuffer : public ReadBuffer
|
||||
{
|
||||
public:
|
||||
explicit TemporaryDataReadBuffer(std::unique_ptr<ReadBuffer> in_);
|
||||
|
||||
private:
|
||||
friend class ::FileCacheTest_TemporaryDataReadBufferSize_Test;
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
WrapperGuard<CompressedReadBuffer, ReadBuffer> compressed_buf;
|
||||
};
|
||||
|
||||
/// Writes data to buffer provided by file_holder, and accounts amount of written data in parent scope.
|
||||
class TemporaryDataBuffer : public WriteBuffer
|
||||
{
|
||||
public:
|
||||
struct Stat
|
||||
{
|
||||
/// Statistics for file
|
||||
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
|
||||
size_t compressed_size = 0;
|
||||
size_t uncompressed_size = 0;
|
||||
size_t num_rows = 0;
|
||||
};
|
||||
|
||||
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
|
||||
TemporaryFileStream(FileSegmentsHolderPtr segments_, const Block & header_, TemporaryDataOnDisk * parent_);
|
||||
|
||||
size_t write(const Block & block);
|
||||
void flush();
|
||||
explicit TemporaryDataBuffer(TemporaryDataOnDiskScope * parent_, size_t max_file_size = 0);
|
||||
void nextImpl() override;
|
||||
void finalizeImpl() override;
|
||||
void cancelImpl() noexcept override;
|
||||
|
||||
std::unique_ptr<ReadBuffer> read();
|
||||
Stat finishWriting();
|
||||
Stat finishWritingAsyncSafe();
|
||||
bool isWriteFinished() const;
|
||||
|
||||
std::unique_ptr<Reader> getReadStream();
|
||||
String describeFilePath() const;
|
||||
|
||||
Block read();
|
||||
|
||||
String getPath() const;
|
||||
size_t getSize() const;
|
||||
|
||||
Block getHeader() const { return header; }
|
||||
|
||||
/// Read finished and file released
|
||||
bool isEof() const;
|
||||
|
||||
~TemporaryFileStream();
|
||||
~TemporaryDataBuffer() override;
|
||||
|
||||
private:
|
||||
void updateAllocAndCheck();
|
||||
|
||||
/// Release everything, close reader and writer, delete file
|
||||
void release();
|
||||
|
||||
TemporaryDataOnDisk * parent;
|
||||
|
||||
Block header;
|
||||
|
||||
/// Data can be stored in file directly or in the cache
|
||||
TemporaryFileOnDiskHolder file;
|
||||
FileSegmentsHolderPtr segment_holder;
|
||||
TemporaryDataOnDiskScope * parent;
|
||||
std::unique_ptr<TemporaryFileHolder> file_holder;
|
||||
WrapperGuard<CompressedWriteBuffer, WriteBuffer> out_compressed_buf;
|
||||
std::once_flag write_finished;
|
||||
|
||||
Stat stat;
|
||||
};
|
||||
|
||||
std::once_flag finish_writing;
|
||||
using TemporaryBlockStreamReaderHolder = WrapperGuard<NativeReader, ReadBuffer>;
|
||||
|
||||
struct OutputWriter;
|
||||
std::unique_ptr<OutputWriter> out_writer;
|
||||
class TemporaryBlockStreamHolder : public WrapperGuard<NativeWriter, TemporaryDataBuffer>
|
||||
{
|
||||
public:
|
||||
TemporaryBlockStreamHolder() = default;
|
||||
|
||||
std::unique_ptr<Reader> in_reader;
|
||||
TemporaryBlockStreamHolder(const Block & header_, TemporaryDataOnDiskScope * parent_, size_t max_file_size = 0);
|
||||
|
||||
TemporaryBlockStreamReaderHolder getReadStream() const;
|
||||
|
||||
TemporaryDataBuffer::Stat finishWriting() const;
|
||||
const Block & getHeader() const { return header; }
|
||||
|
||||
private:
|
||||
Block header;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -934,7 +934,7 @@ static Block generateBlock(size_t size = 0)
|
||||
return block;
|
||||
}
|
||||
|
||||
static size_t readAllTemporaryData(TemporaryFileStream & stream)
|
||||
static size_t readAllTemporaryData(NativeReader & stream)
|
||||
{
|
||||
Block block;
|
||||
size_t read_rows = 0;
|
||||
@ -947,6 +947,7 @@ static size_t readAllTemporaryData(TemporaryFileStream & stream)
|
||||
}
|
||||
|
||||
TEST_F(FileCacheTest, temporaryData)
|
||||
try
|
||||
{
|
||||
ServerUUID::setRandomForUnitTests();
|
||||
DB::FileCacheSettings settings;
|
||||
@ -959,7 +960,7 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
file_cache.initialize();
|
||||
|
||||
const auto user = FileCache::getCommonUser();
|
||||
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(nullptr, &file_cache, TemporaryDataOnDiskSettings{});
|
||||
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(&file_cache, TemporaryDataOnDiskSettings{});
|
||||
|
||||
auto some_data_holder = file_cache.getOrSet(FileCacheKey::fromPath("some_data"), 0, 5_KiB, 5_KiB, CreateFileSegmentSettings{}, 0, user);
|
||||
|
||||
@ -982,12 +983,17 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
|
||||
size_t size_used_with_temporary_data;
|
||||
size_t segments_used_with_temporary_data;
|
||||
|
||||
|
||||
{
|
||||
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
|
||||
TemporaryBlockStreamHolder stream(generateBlock(), tmp_data_scope.get());
|
||||
ASSERT_TRUE(stream);
|
||||
/// Do nothitng with stream, just create it and destroy.
|
||||
}
|
||||
|
||||
auto & stream = tmp_data->createStream(generateBlock());
|
||||
|
||||
ASSERT_GT(stream.write(generateBlock(100)), 0);
|
||||
{
|
||||
TemporaryBlockStreamHolder stream(generateBlock(), tmp_data_scope.get());
|
||||
ASSERT_GT(stream->write(generateBlock(100)), 0);
|
||||
|
||||
ASSERT_GT(file_cache.getUsedCacheSize(), 0);
|
||||
ASSERT_GT(file_cache.getFileSegmentsNum(), 0);
|
||||
@ -995,22 +1001,22 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
size_t used_size_before_attempt = file_cache.getUsedCacheSize();
|
||||
/// data can't be evicted because it is still held by `some_data_holder`
|
||||
ASSERT_THROW({
|
||||
stream.write(generateBlock(2000));
|
||||
stream.flush();
|
||||
stream->write(generateBlock(2000));
|
||||
stream.finishWriting();
|
||||
}, DB::Exception);
|
||||
|
||||
ASSERT_THROW(stream.finishWriting(), DB::Exception);
|
||||
|
||||
ASSERT_EQ(file_cache.getUsedCacheSize(), used_size_before_attempt);
|
||||
}
|
||||
|
||||
{
|
||||
size_t before_used_size = file_cache.getUsedCacheSize();
|
||||
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
|
||||
|
||||
auto write_buf_stream = tmp_data->createRawStream();
|
||||
auto write_buf_stream = std::make_unique<TemporaryDataBuffer>(tmp_data_scope.get());
|
||||
|
||||
write_buf_stream->write("1234567890", 10);
|
||||
write_buf_stream->write("abcde", 5);
|
||||
auto read_buf = dynamic_cast<IReadableWriteBuffer *>(write_buf_stream.get())->tryGetReadBuffer();
|
||||
auto read_buf = write_buf_stream->read();
|
||||
|
||||
ASSERT_GT(file_cache.getUsedCacheSize(), before_used_size + 10);
|
||||
|
||||
@ -1023,22 +1029,22 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
}
|
||||
|
||||
{
|
||||
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
|
||||
auto & stream = tmp_data->createStream(generateBlock());
|
||||
TemporaryBlockStreamHolder stream(generateBlock(), tmp_data_scope.get());
|
||||
|
||||
ASSERT_GT(stream.write(generateBlock(100)), 0);
|
||||
ASSERT_GT(stream->write(generateBlock(100)), 0);
|
||||
|
||||
some_data_holder.reset();
|
||||
|
||||
stream.write(generateBlock(2000));
|
||||
stream->write(generateBlock(2000));
|
||||
|
||||
auto stat = stream.finishWriting();
|
||||
stream.finishWriting();
|
||||
|
||||
ASSERT_TRUE(fs::exists(stream.getPath()));
|
||||
ASSERT_GT(fs::file_size(stream.getPath()), 100);
|
||||
String file_path = stream.getHolder()->describeFilePath().substr(strlen("fscache://"));
|
||||
|
||||
ASSERT_EQ(stat.num_rows, 2100);
|
||||
ASSERT_EQ(readAllTemporaryData(stream), 2100);
|
||||
ASSERT_TRUE(fs::exists(file_path)) << "File " << file_path << " should exist";
|
||||
ASSERT_GT(fs::file_size(file_path), 100) << "File " << file_path << " should be larger than 100 bytes";
|
||||
|
||||
ASSERT_EQ(readAllTemporaryData(*stream.getReadStream()), 2100);
|
||||
|
||||
size_used_with_temporary_data = file_cache.getUsedCacheSize();
|
||||
segments_used_with_temporary_data = file_cache.getFileSegmentsNum();
|
||||
@ -1054,6 +1060,11 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
ASSERT_LE(file_cache.getUsedCacheSize(), size_used_before_temporary_data);
|
||||
ASSERT_LE(file_cache.getFileSegmentsNum(), segments_used_before_temporary_data);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << getCurrentExceptionMessage(true) << std::endl;
|
||||
throw;
|
||||
}
|
||||
|
||||
TEST_F(FileCacheTest, CachedReadBuffer)
|
||||
{
|
||||
@ -1148,18 +1159,22 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
|
||||
DB::FileCache file_cache("cache", settings);
|
||||
file_cache.initialize();
|
||||
|
||||
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(/*volume=*/nullptr, &file_cache, /*settings=*/TemporaryDataOnDiskSettings{});
|
||||
|
||||
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
|
||||
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(&file_cache, TemporaryDataOnDiskSettings{});
|
||||
|
||||
auto block = generateBlock(/*size=*/3);
|
||||
auto & stream = tmp_data->createStream(block);
|
||||
stream.write(block);
|
||||
stream.finishWriting();
|
||||
TemporaryBlockStreamHolder stream(block, tmp_data_scope.get());
|
||||
|
||||
/// We allocate buffer of size min(getSize(), DBMS_DEFAULT_BUFFER_SIZE)
|
||||
stream->write(block);
|
||||
auto stat = stream.finishWriting();
|
||||
|
||||
/// We allocate buffer of size min(stat.compressed_size, DBMS_DEFAULT_BUFFER_SIZE)
|
||||
/// We do care about buffer size because realistic external group by could generate 10^5 temporary files
|
||||
ASSERT_EQ(stream.getSize(), 62);
|
||||
ASSERT_EQ(stat.compressed_size, 62);
|
||||
|
||||
auto reader = stream.getReadStream();
|
||||
auto * read_buf = reader.getHolder();
|
||||
const auto & internal_buffer = static_cast<TemporaryDataReadBuffer *>(read_buf)->compressed_buf.getHolder()->internalBuffer();
|
||||
ASSERT_EQ(internal_buffer.size(), 62);
|
||||
}
|
||||
|
||||
/// Temporary data stored on disk
|
||||
@ -1170,16 +1185,14 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
|
||||
disk = createDisk("temporary_data_read_buffer_size_test_dir");
|
||||
VolumePtr volume = std::make_shared<SingleDiskVolume>("volume", disk);
|
||||
|
||||
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(/*volume=*/volume, /*cache=*/nullptr, /*settings=*/TemporaryDataOnDiskSettings{});
|
||||
|
||||
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
|
||||
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(volume, TemporaryDataOnDiskSettings{});
|
||||
|
||||
auto block = generateBlock(/*size=*/3);
|
||||
auto & stream = tmp_data->createStream(block);
|
||||
stream.write(block);
|
||||
stream.finishWriting();
|
||||
TemporaryBlockStreamHolder stream(block, tmp_data_scope.get());
|
||||
stream->write(block);
|
||||
auto stat = stream.finishWriting();
|
||||
|
||||
ASSERT_EQ(stream.getSize(), 62);
|
||||
ASSERT_EQ(stat.compressed_size, 62);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
@ -29,17 +30,18 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
|
||||
size_t max_block_size_rows_,
|
||||
size_t max_block_size_bytes_,
|
||||
LoggerPtr log_,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_,
|
||||
bool use_average_block_sizes)
|
||||
: IMergingAlgorithmWithSharedChunks(
|
||||
header_,
|
||||
num_inputs,
|
||||
std::move(description_),
|
||||
out_row_sources_buf_,
|
||||
temp_data_buffer_.get(),
|
||||
max_row_refs,
|
||||
std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_))
|
||||
, sign_column_number(header_.getPositionByName(sign_column))
|
||||
, only_positive_sign(only_positive_sign_)
|
||||
, temp_data_buffer(temp_data_buffer_)
|
||||
, log(log_)
|
||||
{
|
||||
}
|
||||
|
@ -11,6 +11,8 @@ namespace Poco
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class TemporaryDataBuffer;
|
||||
|
||||
/** Merges several sorted inputs to one.
|
||||
* For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
|
||||
* keeps no more than one row with the value of the column `sign_column = -1` ("negative row")
|
||||
@ -35,7 +37,7 @@ public:
|
||||
size_t max_block_size_rows_,
|
||||
size_t max_block_size_bytes_,
|
||||
LoggerPtr log_,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
|
||||
bool use_average_block_sizes = false);
|
||||
|
||||
const char * getName() const override { return "CollapsingSortedAlgorithm"; }
|
||||
@ -62,6 +64,8 @@ private:
|
||||
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
|
||||
|
||||
size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer = nullptr;
|
||||
|
||||
LoggerPtr log;
|
||||
|
||||
void reportIncorrectData();
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -15,7 +16,7 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
|
||||
size_t max_block_size_bytes_,
|
||||
SortingQueueStrategy sorting_queue_strategy_,
|
||||
UInt64 limit_,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_,
|
||||
bool use_average_block_sizes)
|
||||
: header(std::move(header_))
|
||||
, merged_data(use_average_block_sizes, max_block_size_, max_block_size_bytes_)
|
||||
|
@ -9,6 +9,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class TemporaryDataBuffer;
|
||||
|
||||
/// Merges several sorted inputs into one sorted output.
|
||||
class MergingSortedAlgorithm final : public IMergingAlgorithm
|
||||
{
|
||||
@ -21,7 +23,7 @@ public:
|
||||
size_t max_block_size_bytes_,
|
||||
SortingQueueStrategy sorting_queue_strategy_,
|
||||
UInt64 limit_ = 0,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_ = nullptr,
|
||||
bool use_average_block_sizes = false);
|
||||
|
||||
void addInput();
|
||||
@ -45,7 +47,7 @@ private:
|
||||
|
||||
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
|
||||
/// If it is not nullptr then it should be populated during execution
|
||||
WriteBuffer * out_row_sources_buf = nullptr;
|
||||
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf = nullptr;
|
||||
|
||||
/// Chunks currently being merged.
|
||||
Inputs current_inputs;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Processors/Merges/Algorithms/RowRef.h>
|
||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -37,12 +38,13 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
|
||||
const String & version_column,
|
||||
size_t max_block_size_rows,
|
||||
size_t max_block_size_bytes,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_,
|
||||
bool use_average_block_sizes,
|
||||
bool cleanup_,
|
||||
bool enable_vertical_final_)
|
||||
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs, std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows, max_block_size_bytes))
|
||||
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), temp_data_buffer_.get(), max_row_refs, std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows, max_block_size_bytes))
|
||||
, cleanup(cleanup_), enable_vertical_final(enable_vertical_final_)
|
||||
, temp_data_buffer(temp_data_buffer_)
|
||||
{
|
||||
if (!is_deleted_column.empty())
|
||||
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
|
||||
|
@ -24,6 +24,8 @@ struct ChunkSelectFinalIndices : public ChunkInfoCloneable<ChunkSelectFinalIndic
|
||||
const ColumnUInt64 * select_final_indices = nullptr;
|
||||
};
|
||||
|
||||
class TemporaryDataBuffer;
|
||||
|
||||
/** Merges several sorted inputs into one.
|
||||
* For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
|
||||
* keeps row with max `version` value.
|
||||
@ -38,7 +40,7 @@ public:
|
||||
const String & version_column,
|
||||
size_t max_block_size_rows,
|
||||
size_t max_block_size_bytes,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
|
||||
bool use_average_block_sizes = false,
|
||||
bool cleanup = false,
|
||||
bool enable_vertical_final_ = false);
|
||||
@ -59,6 +61,8 @@ private:
|
||||
RowRef selected_row; /// Last row with maximum version for current primary key, may extend lifetime of chunk in input source
|
||||
size_t max_pos = 0; /// The position (into current_row_sources) of the row with the highest version.
|
||||
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer = nullptr;
|
||||
|
||||
/// Sources of rows with the current primary key.
|
||||
PODArray<RowSourcePart> current_row_sources;
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -14,12 +15,13 @@ VersionedCollapsingAlgorithm::VersionedCollapsingAlgorithm(
|
||||
const String & sign_column_,
|
||||
size_t max_block_size_rows_,
|
||||
size_t max_block_size_bytes_,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_,
|
||||
bool use_average_block_sizes)
|
||||
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE, std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_))
|
||||
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), temp_data_buffer_.get(), MAX_ROWS_IN_MULTIVERSION_QUEUE, std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_))
|
||||
/// -1 for +1 in FixedSizeDequeWithGaps's internal buffer. 3 is a reasonable minimum size to collapse anything.
|
||||
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_rows_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
|
||||
, current_keys(max_rows_in_queue)
|
||||
, temp_data_buffer(temp_data_buffer_)
|
||||
{
|
||||
sign_column_number = header_.getPositionByName(sign_column_);
|
||||
}
|
||||
|
@ -8,6 +8,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class TemporaryDataBuffer;
|
||||
|
||||
/** Merges several sorted inputs to one.
|
||||
* For each group of consecutive identical values of the sorting key
|
||||
* (the columns by which the data is sorted, including specially specified version column),
|
||||
@ -22,7 +24,7 @@ public:
|
||||
SortDescription description_, const String & sign_column_,
|
||||
size_t max_block_size_rows,
|
||||
size_t max_block_size_bytes,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
|
||||
bool use_average_block_sizes = false);
|
||||
|
||||
const char * getName() const override { return "VersionedCollapsingAlgorithm"; }
|
||||
@ -37,6 +39,8 @@ private:
|
||||
FixedSizeDequeWithGaps<RowRef> current_keys;
|
||||
Int8 sign_in_queue = 0;
|
||||
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer = nullptr;
|
||||
|
||||
std::queue<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
|
||||
|
||||
void insertGap(size_t gap_size);
|
||||
|
@ -23,7 +23,7 @@ public:
|
||||
bool only_positive_sign,
|
||||
size_t max_block_size_rows,
|
||||
size_t max_block_size_bytes,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_ = nullptr,
|
||||
bool use_average_block_sizes = false)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
|
@ -20,7 +20,7 @@ MergingSortedTransform::MergingSortedTransform(
|
||||
SortingQueueStrategy sorting_queue_strategy,
|
||||
UInt64 limit_,
|
||||
bool always_read_till_end_,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_,
|
||||
bool use_average_block_sizes,
|
||||
bool have_all_inputs_)
|
||||
: IMergingTransform(
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
SortingQueueStrategy sorting_queue_strategy,
|
||||
UInt64 limit_ = 0,
|
||||
bool always_read_till_end_ = false,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_ = nullptr,
|
||||
bool use_average_block_sizes = false,
|
||||
bool have_all_inputs_ = true);
|
||||
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
const String & is_deleted_column, const String & version_column,
|
||||
size_t max_block_size_rows,
|
||||
size_t max_block_size_bytes,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
|
||||
bool use_average_block_sizes = false,
|
||||
bool cleanup = false,
|
||||
bool enable_vertical_final = false)
|
||||
@ -34,7 +34,7 @@ public:
|
||||
version_column,
|
||||
max_block_size_rows,
|
||||
max_block_size_bytes,
|
||||
out_row_sources_buf_,
|
||||
temp_data_buffer_,
|
||||
use_average_block_sizes,
|
||||
cleanup,
|
||||
enable_vertical_final)
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
SortDescription description_, const String & sign_column_,
|
||||
size_t max_block_size_rows,
|
||||
size_t max_block_size_bytes,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
|
||||
bool use_average_block_sizes = false)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
@ -31,7 +31,7 @@ public:
|
||||
sign_column_,
|
||||
max_block_size_rows,
|
||||
max_block_size_bytes,
|
||||
out_row_sources_buf_,
|
||||
temp_data_buffer_,
|
||||
use_average_block_sizes)
|
||||
{
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ struct BuildQueryPipelineSettings
|
||||
ExpressionActionsSettings actions_settings;
|
||||
QueryStatusPtr process_list_element;
|
||||
ProgressCallback progress_callback = nullptr;
|
||||
TemporaryFileLookupPtr temporary_file_lookup;
|
||||
|
||||
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
|
||||
static BuildQueryPipelineSettings fromContext(ContextPtr from);
|
||||
|
@ -280,9 +280,9 @@ void SortingStep::mergeSorting(
|
||||
if (increase_sort_description_compile_attempts)
|
||||
increase_sort_description_compile_attempts = false;
|
||||
|
||||
auto tmp_data_on_disk = sort_settings.tmp_data
|
||||
? std::make_unique<TemporaryDataOnDisk>(sort_settings.tmp_data, CurrentMetrics::TemporaryFilesForSort)
|
||||
: std::unique_ptr<TemporaryDataOnDisk>();
|
||||
TemporaryDataOnDiskScopePtr tmp_data_on_disk = nullptr;
|
||||
if (sort_settings.tmp_data)
|
||||
tmp_data_on_disk = sort_settings.tmp_data->childScope(CurrentMetrics::TemporaryFilesForSort);
|
||||
|
||||
return std::make_shared<MergeSortingTransform>(
|
||||
header,
|
||||
|
@ -54,9 +54,9 @@ namespace
|
||||
class SourceFromNativeStream : public ISource
|
||||
{
|
||||
public:
|
||||
explicit SourceFromNativeStream(TemporaryFileStream * tmp_stream_)
|
||||
: ISource(tmp_stream_->getHeader())
|
||||
, tmp_stream(tmp_stream_)
|
||||
explicit SourceFromNativeStream(const Block & header, TemporaryBlockStreamReaderHolder tmp_stream_)
|
||||
: ISource(header)
|
||||
, tmp_stream(std::move(tmp_stream_))
|
||||
{}
|
||||
|
||||
String getName() const override { return "SourceFromNativeStream"; }
|
||||
@ -69,7 +69,7 @@ namespace
|
||||
auto block = tmp_stream->read();
|
||||
if (!block)
|
||||
{
|
||||
tmp_stream = nullptr;
|
||||
tmp_stream.reset();
|
||||
return {};
|
||||
}
|
||||
return convertToChunk(block);
|
||||
@ -78,7 +78,7 @@ namespace
|
||||
std::optional<ReadProgress> getReadProgress() override { return std::nullopt; }
|
||||
|
||||
private:
|
||||
TemporaryFileStream * tmp_stream;
|
||||
TemporaryBlockStreamReaderHolder tmp_stream;
|
||||
};
|
||||
}
|
||||
|
||||
@ -811,15 +811,18 @@ void AggregatingTransform::initGenerate()
|
||||
|
||||
Pipes pipes;
|
||||
/// Merge external data from all aggregators used in query.
|
||||
for (const auto & aggregator : *params->aggregator_list_ptr)
|
||||
for (auto & aggregator : *params->aggregator_list_ptr)
|
||||
{
|
||||
const auto & tmp_data = aggregator.getTemporaryData();
|
||||
for (auto * tmp_stream : tmp_data.getStreams())
|
||||
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream)));
|
||||
auto & tmp_data = aggregator.getTemporaryData();
|
||||
num_streams += tmp_data.size();
|
||||
|
||||
num_streams += tmp_data.getStreams().size();
|
||||
compressed_size += tmp_data.getStat().compressed_size;
|
||||
uncompressed_size += tmp_data.getStat().uncompressed_size;
|
||||
for (auto & tmp_stream : tmp_data)
|
||||
{
|
||||
auto stat = tmp_stream.finishWriting();
|
||||
compressed_size += stat.compressed_size;
|
||||
uncompressed_size += stat.uncompressed_size;
|
||||
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream.getHeader(), tmp_stream.getReadStream())));
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG(
|
||||
|
@ -27,15 +27,20 @@ namespace ProfileEvents
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
class BufferingToFileTransform : public IAccumulatingTransform
|
||||
{
|
||||
public:
|
||||
BufferingToFileTransform(const Block & header, TemporaryFileStream & tmp_stream_, LoggerPtr log_)
|
||||
BufferingToFileTransform(const Block & header, TemporaryBlockStreamHolder tmp_stream_, LoggerPtr log_)
|
||||
: IAccumulatingTransform(header, header)
|
||||
, tmp_stream(tmp_stream_)
|
||||
, tmp_stream(std::move(tmp_stream_))
|
||||
, log(log_)
|
||||
{
|
||||
LOG_INFO(log, "Sorting and writing part of data into temporary file {}", tmp_stream.getPath());
|
||||
LOG_INFO(log, "Sorting and writing part of data into temporary file {}", tmp_stream.getHolder()->describeFilePath());
|
||||
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
|
||||
}
|
||||
|
||||
@ -44,14 +49,15 @@ public:
|
||||
void consume(Chunk chunk) override
|
||||
{
|
||||
Block block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
tmp_stream.write(block);
|
||||
tmp_stream->write(block);
|
||||
}
|
||||
|
||||
Chunk generate() override
|
||||
{
|
||||
if (!tmp_stream.isWriteFinished())
|
||||
if (!tmp_read_stream)
|
||||
{
|
||||
auto stat = tmp_stream.finishWriting();
|
||||
tmp_read_stream = tmp_stream.getReadStream();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size);
|
||||
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size);
|
||||
@ -59,10 +65,11 @@ public:
|
||||
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, stat.uncompressed_size);
|
||||
|
||||
LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ",
|
||||
tmp_stream.getPath(), ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
|
||||
tmp_stream.getHolder()->describeFilePath(),
|
||||
ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
|
||||
}
|
||||
|
||||
Block block = tmp_stream.read();
|
||||
Block block = tmp_read_stream->read();
|
||||
if (!block)
|
||||
return {};
|
||||
|
||||
@ -71,7 +78,8 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
TemporaryFileStream & tmp_stream;
|
||||
TemporaryBlockStreamHolder tmp_stream;
|
||||
TemporaryBlockStreamReaderHolder tmp_read_stream;
|
||||
|
||||
LoggerPtr log;
|
||||
};
|
||||
@ -86,7 +94,7 @@ MergeSortingTransform::MergeSortingTransform(
|
||||
size_t max_bytes_before_remerge_,
|
||||
double remerge_lowered_memory_bytes_ratio_,
|
||||
size_t max_bytes_before_external_sort_,
|
||||
TemporaryDataOnDiskPtr tmp_data_,
|
||||
TemporaryDataOnDiskScopePtr tmp_data_,
|
||||
size_t min_free_disk_space_)
|
||||
: SortingTransform(header, description_, max_merged_block_size_, limit_, increase_sort_description_compile_attempts)
|
||||
, max_bytes_before_remerge(max_bytes_before_remerge_)
|
||||
@ -168,9 +176,13 @@ void MergeSortingTransform::consume(Chunk chunk)
|
||||
*/
|
||||
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
|
||||
{
|
||||
if (!tmp_data)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDisk is not set for MergeSortingTransform");
|
||||
temporary_files_num++;
|
||||
|
||||
/// If there's less free disk space than reserve_size, an exception will be thrown
|
||||
size_t reserve_size = sum_bytes_in_blocks + min_free_disk_space;
|
||||
auto & tmp_stream = tmp_data->createStream(header_without_constants, reserve_size);
|
||||
TemporaryBlockStreamHolder tmp_stream(header_without_constants, tmp_data.get(), reserve_size);
|
||||
size_t max_merged_block_size = this->max_merged_block_size;
|
||||
if (max_block_bytes > 0 && sum_rows_in_blocks > 0 && sum_bytes_in_blocks > 0)
|
||||
{
|
||||
@ -179,7 +191,7 @@ void MergeSortingTransform::consume(Chunk chunk)
|
||||
max_merged_block_size = std::max(std::min(max_merged_block_size, max_block_bytes / avg_row_bytes), 128UL);
|
||||
}
|
||||
merge_sorter = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
|
||||
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, tmp_stream, log);
|
||||
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, std::move(tmp_stream), log);
|
||||
|
||||
processors.emplace_back(current_processor);
|
||||
|
||||
@ -221,14 +233,14 @@ void MergeSortingTransform::generate()
|
||||
{
|
||||
if (!generated_prefix)
|
||||
{
|
||||
size_t num_tmp_files = tmp_data ? tmp_data->getStreams().size() : 0;
|
||||
if (num_tmp_files == 0)
|
||||
merge_sorter
|
||||
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
|
||||
if (temporary_files_num == 0)
|
||||
{
|
||||
merge_sorter = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
|
||||
LOG_INFO(log, "There are {} temporary sorted parts to merge", num_tmp_files);
|
||||
LOG_INFO(log, "There are {} temporary sorted parts to merge", temporary_files_num);
|
||||
|
||||
processors.emplace_back(std::make_shared<MergeSorterSource>(
|
||||
header_without_constants, std::move(chunks), description, max_merged_block_size, limit));
|
||||
|
@ -29,7 +29,7 @@ public:
|
||||
size_t max_bytes_before_remerge_,
|
||||
double remerge_lowered_memory_bytes_ratio_,
|
||||
size_t max_bytes_before_external_sort_,
|
||||
TemporaryDataOnDiskPtr tmp_data_,
|
||||
TemporaryDataOnDiskScopePtr tmp_data_,
|
||||
size_t min_free_disk_space_);
|
||||
|
||||
String getName() const override { return "MergeSortingTransform"; }
|
||||
@ -45,7 +45,8 @@ private:
|
||||
size_t max_bytes_before_remerge;
|
||||
double remerge_lowered_memory_bytes_ratio;
|
||||
size_t max_bytes_before_external_sort;
|
||||
TemporaryDataOnDiskPtr tmp_data;
|
||||
TemporaryDataOnDiskScopePtr tmp_data;
|
||||
size_t temporary_files_num = 0;
|
||||
size_t min_free_disk_space;
|
||||
size_t max_block_bytes;
|
||||
|
||||
|
@ -197,6 +197,12 @@ public:
|
||||
void setQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { resources.query_id_holders.emplace_back(std::move(query_id_holder)); }
|
||||
void addContext(ContextPtr context) { resources.interpreter_context.emplace_back(std::move(context)); }
|
||||
|
||||
template <typename Resource>
|
||||
void addResource(Resource resource, std::vector<Resource> QueryPlanResourceHolder::*field)
|
||||
{
|
||||
(resources.*field).push_back(std::move(resource));
|
||||
}
|
||||
|
||||
/// Convert query pipeline to pipe.
|
||||
static Pipe getPipe(QueryPipelineBuilder pipeline, QueryPlanResourceHolder & resources);
|
||||
static QueryPipeline getPipeline(QueryPipelineBuilder builder);
|
||||
|
@ -13,6 +13,7 @@ class QueryPlan;
|
||||
class Context;
|
||||
|
||||
struct QueryIdHolder;
|
||||
class TemporaryDataBuffer;
|
||||
|
||||
struct QueryPlanResourceHolder
|
||||
{
|
||||
@ -33,6 +34,7 @@ struct QueryPlanResourceHolder
|
||||
std::vector<StoragePtr> storage_holders;
|
||||
std::vector<TableLockHolder> table_locks;
|
||||
std::vector<std::shared_ptr<QueryIdHolder>> query_id_holders;
|
||||
std::vector<std::shared_ptr<TemporaryDataBuffer>> rows_sources_temporary_file;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -162,15 +162,16 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
|
||||
|
||||
for (auto & write_buf : write_buffers)
|
||||
{
|
||||
if (!write_buf)
|
||||
continue;
|
||||
|
||||
IReadableWriteBuffer * write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get());
|
||||
if (write_buf_concrete)
|
||||
if (auto * write_buf_concrete = dynamic_cast<TemporaryDataBuffer *>(write_buf.get()))
|
||||
{
|
||||
ReadBufferPtr reread_buf = write_buf_concrete->tryGetReadBuffer();
|
||||
if (reread_buf)
|
||||
read_buffers.emplace_back(wrapReadBufferPointer(reread_buf));
|
||||
if (auto reread_buf = write_buf_concrete->read())
|
||||
read_buffers.emplace_back(std::move(reread_buf));
|
||||
}
|
||||
|
||||
if (auto * write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get()))
|
||||
{
|
||||
if (auto reread_buf = write_buf_concrete->tryGetReadBuffer())
|
||||
read_buffers.emplace_back(std::move(reread_buf));
|
||||
}
|
||||
}
|
||||
|
||||
@ -312,21 +313,19 @@ void HTTPHandler::processQuery(
|
||||
|
||||
if (buffer_size_memory > 0 || buffer_until_eof)
|
||||
{
|
||||
CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1;
|
||||
CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2;
|
||||
CascadeWriteBuffer::WriteBufferPtrs cascade_buffers;
|
||||
CascadeWriteBuffer::WriteBufferConstructors cascade_buffers_lazy;
|
||||
|
||||
if (buffer_size_memory > 0)
|
||||
cascade_buffer1.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory));
|
||||
cascade_buffers.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory));
|
||||
|
||||
if (buffer_until_eof)
|
||||
{
|
||||
auto tmp_data = std::make_shared<TemporaryDataOnDisk>(server.context()->getTempDataOnDisk());
|
||||
|
||||
auto create_tmp_disk_buffer = [tmp_data] (const WriteBufferPtr &) -> WriteBufferPtr {
|
||||
return tmp_data->createRawStream();
|
||||
};
|
||||
|
||||
cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer));
|
||||
auto tmp_data = server.context()->getTempDataOnDisk();
|
||||
cascade_buffers_lazy.emplace_back([tmp_data](const WriteBufferPtr &) -> WriteBufferPtr
|
||||
{
|
||||
return std::make_unique<TemporaryDataBuffer>(tmp_data.get());
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -342,10 +341,10 @@ void HTTPHandler::processQuery(
|
||||
return next_buffer;
|
||||
};
|
||||
|
||||
cascade_buffer2.emplace_back(push_memory_buffer_and_continue);
|
||||
cascade_buffers_lazy.emplace_back(push_memory_buffer_and_continue);
|
||||
}
|
||||
|
||||
used_output.out_delayed_and_compressed_holder = std::make_unique<CascadeWriteBuffer>(std::move(cascade_buffer1), std::move(cascade_buffer2));
|
||||
used_output.out_delayed_and_compressed_holder = std::make_unique<CascadeWriteBuffer>(std::move(cascade_buffers), std::move(cascade_buffers_lazy));
|
||||
used_output.out_maybe_delayed_and_compressed = used_output.out_delayed_and_compressed_holder.get();
|
||||
}
|
||||
else
|
||||
|
@ -118,68 +118,6 @@ static ColumnsStatistics getStatisticsForColumns(
|
||||
return all_statistics;
|
||||
}
|
||||
|
||||
/// Manages the "rows_sources" temporary file that is used during vertical merge.
|
||||
class RowsSourcesTemporaryFile : public ITemporaryFileLookup
|
||||
{
|
||||
public:
|
||||
/// A logical name of the temporary file under which it will be known to the plan steps that use it.
|
||||
static constexpr auto FILE_ID = "rows_sources";
|
||||
|
||||
explicit RowsSourcesTemporaryFile(TemporaryDataOnDiskScopePtr temporary_data_on_disk_)
|
||||
: tmp_disk(std::make_unique<TemporaryDataOnDisk>(temporary_data_on_disk_))
|
||||
, uncompressed_write_buffer(tmp_disk->createRawStream())
|
||||
, tmp_file_name_on_disk(uncompressed_write_buffer->getFileName())
|
||||
{
|
||||
}
|
||||
|
||||
WriteBuffer & getTemporaryFileForWriting(const String & name) override
|
||||
{
|
||||
if (name != FILE_ID)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
|
||||
|
||||
if (write_buffer)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was already requested for writing, there musto be only one writer");
|
||||
|
||||
write_buffer = (std::make_unique<CompressedWriteBuffer>(*uncompressed_write_buffer));
|
||||
return *write_buffer;
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> getTemporaryFileForReading(const String & name) override
|
||||
{
|
||||
if (name != FILE_ID)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
|
||||
|
||||
if (!finalized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file is not finalized yet");
|
||||
|
||||
/// tmp_disk might not create real file if no data was written to it.
|
||||
if (final_size == 0)
|
||||
return std::make_unique<ReadBufferFromEmptyFile>();
|
||||
|
||||
/// Reopen the file for each read so that multiple reads can be performed in parallel and there is no need to seek to the beginning.
|
||||
auto raw_file_read_buffer = std::make_unique<ReadBufferFromFile>(tmp_file_name_on_disk);
|
||||
return std::make_unique<CompressedReadBufferFromFile>(std::move(raw_file_read_buffer));
|
||||
}
|
||||
|
||||
/// Returns written data size in bytes
|
||||
size_t finalizeWriting()
|
||||
{
|
||||
write_buffer->finalize();
|
||||
uncompressed_write_buffer->finalize();
|
||||
finalized = true;
|
||||
final_size = write_buffer->count();
|
||||
return final_size;
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<TemporaryDataOnDisk> tmp_disk;
|
||||
std::unique_ptr<WriteBufferFromFileBase> uncompressed_write_buffer;
|
||||
std::unique_ptr<WriteBuffer> write_buffer;
|
||||
const String tmp_file_name_on_disk;
|
||||
bool finalized = false;
|
||||
size_t final_size = 0;
|
||||
};
|
||||
|
||||
static void addMissedColumnsToSerializationInfos(
|
||||
size_t num_rows_in_parts,
|
||||
const Names & part_columns,
|
||||
@ -480,7 +418,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
|
||||
}
|
||||
case MergeAlgorithm::Vertical:
|
||||
{
|
||||
ctx->rows_sources_temporary_file = std::make_shared<RowsSourcesTemporaryFile>(global_ctx->context->getTempDataOnDisk());
|
||||
ctx->rows_sources_temporary_file = std::make_unique<TemporaryDataBuffer>(global_ctx->context->getTempDataOnDisk().get());
|
||||
|
||||
std::map<String, UInt64> local_merged_column_to_size;
|
||||
for (const auto & part : global_ctx->future_part->parts)
|
||||
@ -854,22 +792,11 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
|
||||
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
|
||||
return false;
|
||||
|
||||
size_t sum_input_rows_exact = global_ctx->merge_list_element_ptr->rows_read;
|
||||
size_t input_rows_filtered = *global_ctx->input_rows_filtered;
|
||||
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_columns.size();
|
||||
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
|
||||
|
||||
/// Ensure data has written to disk.
|
||||
size_t rows_sources_count = ctx->rows_sources_temporary_file->finalizeWriting();
|
||||
/// In special case, when there is only one source part, and no rows were skipped, we may have
|
||||
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
|
||||
/// number of input rows.
|
||||
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1) && sum_input_rows_exact != rows_sources_count + input_rows_filtered)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Number of rows in source parts ({}) excluding filtered rows ({}) differs from number "
|
||||
"of bytes written to rows_sources file ({}). It is a bug.",
|
||||
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
|
||||
ctx->rows_sources_temporary_file->finishWriting();
|
||||
|
||||
ctx->it_name_and_type = global_ctx->gathering_columns.cbegin();
|
||||
|
||||
@ -901,12 +828,12 @@ class ColumnGathererStep : public ITransformingStep
|
||||
public:
|
||||
ColumnGathererStep(
|
||||
const Header & input_header_,
|
||||
const String & rows_sources_temporary_file_name_,
|
||||
std::unique_ptr<ReadBuffer> rows_sources_read_buf_,
|
||||
UInt64 merge_block_size_rows_,
|
||||
UInt64 merge_block_size_bytes_,
|
||||
bool is_result_sparse_)
|
||||
: ITransformingStep(input_header_, input_header_, getTraits())
|
||||
, rows_sources_temporary_file_name(rows_sources_temporary_file_name_)
|
||||
, rows_sources_read_buf(std::move(rows_sources_read_buf_))
|
||||
, merge_block_size_rows(merge_block_size_rows_)
|
||||
, merge_block_size_bytes(merge_block_size_bytes_)
|
||||
, is_result_sparse(is_result_sparse_)
|
||||
@ -914,15 +841,13 @@ public:
|
||||
|
||||
String getName() const override { return "ColumnGatherer"; }
|
||||
|
||||
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override
|
||||
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /* pipeline_settings */) override
|
||||
{
|
||||
const auto &header = pipeline.getHeader();
|
||||
const auto & header = pipeline.getHeader();
|
||||
const auto input_streams_count = pipeline.getNumStreams();
|
||||
|
||||
if (!pipeline_settings.temporary_file_lookup)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge");
|
||||
|
||||
auto rows_sources_read_buf = pipeline_settings.temporary_file_lookup->getTemporaryFileForReading(rows_sources_temporary_file_name);
|
||||
if (!rows_sources_read_buf)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary data buffer for rows sources is not set");
|
||||
|
||||
auto transform = std::make_unique<ColumnGathererTransform>(
|
||||
header,
|
||||
@ -957,7 +882,7 @@ private:
|
||||
}
|
||||
|
||||
MergeTreeData::MergingParams merging_params{};
|
||||
const String rows_sources_temporary_file_name;
|
||||
std::unique_ptr<ReadBuffer> rows_sources_read_buf;
|
||||
const UInt64 merge_block_size_rows;
|
||||
const UInt64 merge_block_size_bytes;
|
||||
const bool is_result_sparse;
|
||||
@ -1008,7 +933,7 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
|
||||
const auto data_settings = global_ctx->data->getSettings();
|
||||
auto merge_step = std::make_unique<ColumnGathererStep>(
|
||||
merge_column_query_plan.getCurrentHeader(),
|
||||
RowsSourcesTemporaryFile::FILE_ID,
|
||||
ctx->rows_sources_temporary_file->read(),
|
||||
(*data_settings)[MergeTreeSetting::merge_max_block_size],
|
||||
(*data_settings)[MergeTreeSetting::merge_max_block_size_bytes],
|
||||
is_result_sparse);
|
||||
@ -1037,9 +962,9 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
|
||||
}
|
||||
|
||||
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
|
||||
pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file;
|
||||
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
|
||||
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
|
||||
builder->addResource<std::shared_ptr<TemporaryDataBuffer>>(ctx->rows_sources_temporary_file, &QueryPlanResourceHolder::rows_sources_temporary_file);
|
||||
|
||||
return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)};
|
||||
}
|
||||
@ -1401,7 +1326,7 @@ public:
|
||||
const SortDescription & sort_description_,
|
||||
const Names partition_key_columns_,
|
||||
const MergeTreeData::MergingParams & merging_params_,
|
||||
const String & rows_sources_temporary_file_name_,
|
||||
std::shared_ptr<TemporaryDataBuffer> rows_sources_temporary_file_,
|
||||
UInt64 merge_block_size_rows_,
|
||||
UInt64 merge_block_size_bytes_,
|
||||
bool blocks_are_granules_size_,
|
||||
@ -1411,7 +1336,7 @@ public:
|
||||
, sort_description(sort_description_)
|
||||
, partition_key_columns(partition_key_columns_)
|
||||
, merging_params(merging_params_)
|
||||
, rows_sources_temporary_file_name(rows_sources_temporary_file_name_)
|
||||
, rows_sources_temporary_file(rows_sources_temporary_file_)
|
||||
, merge_block_size_rows(merge_block_size_rows_)
|
||||
, merge_block_size_bytes(merge_block_size_bytes_)
|
||||
, blocks_are_granules_size(blocks_are_granules_size_)
|
||||
@ -1421,7 +1346,7 @@ public:
|
||||
|
||||
String getName() const override { return "MergeParts"; }
|
||||
|
||||
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override
|
||||
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /* pipeline_settings */) override
|
||||
{
|
||||
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
|
||||
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
|
||||
@ -1431,14 +1356,6 @@ public:
|
||||
const auto &header = pipeline.getHeader();
|
||||
const auto input_streams_count = pipeline.getNumStreams();
|
||||
|
||||
WriteBuffer * rows_sources_write_buf = nullptr;
|
||||
if (!rows_sources_temporary_file_name.empty())
|
||||
{
|
||||
if (!pipeline_settings.temporary_file_lookup)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge");
|
||||
rows_sources_write_buf = &pipeline_settings.temporary_file_lookup->getTemporaryFileForWriting(rows_sources_temporary_file_name);
|
||||
}
|
||||
|
||||
switch (merging_params.mode)
|
||||
{
|
||||
case MergeTreeData::MergingParams::Ordinary:
|
||||
@ -1451,14 +1368,14 @@ public:
|
||||
SortingQueueStrategy::Default,
|
||||
/* limit_= */0,
|
||||
/* always_read_till_end_= */false,
|
||||
rows_sources_write_buf,
|
||||
rows_sources_temporary_file,
|
||||
blocks_are_granules_size);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Collapsing:
|
||||
merged_transform = std::make_shared<CollapsingSortedTransform>(
|
||||
header, input_streams_count, sort_description, merging_params.sign_column, false,
|
||||
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size);
|
||||
merge_block_size_rows, merge_block_size_bytes, rows_sources_temporary_file, blocks_are_granules_size);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Summing:
|
||||
@ -1473,7 +1390,7 @@ public:
|
||||
case MergeTreeData::MergingParams::Replacing:
|
||||
merged_transform = std::make_shared<ReplacingSortedTransform>(
|
||||
header, input_streams_count, sort_description, merging_params.is_deleted_column, merging_params.version_column,
|
||||
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size,
|
||||
merge_block_size_rows, merge_block_size_bytes, rows_sources_temporary_file, blocks_are_granules_size,
|
||||
cleanup);
|
||||
break;
|
||||
|
||||
@ -1486,7 +1403,7 @@ public:
|
||||
case MergeTreeData::MergingParams::VersionedCollapsing:
|
||||
merged_transform = std::make_shared<VersionedCollapsingTransform>(
|
||||
header, input_streams_count, sort_description, merging_params.sign_column,
|
||||
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size);
|
||||
merge_block_size_rows, merge_block_size_bytes, rows_sources_temporary_file, blocks_are_granules_size);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -1528,7 +1445,7 @@ private:
|
||||
const SortDescription sort_description;
|
||||
const Names partition_key_columns;
|
||||
const MergeTreeData::MergingParams merging_params{};
|
||||
const String rows_sources_temporary_file_name;
|
||||
std::shared_ptr<TemporaryDataBuffer> rows_sources_temporary_file;
|
||||
const UInt64 merge_block_size_rows;
|
||||
const UInt64 merge_block_size_bytes;
|
||||
const bool blocks_are_granules_size;
|
||||
@ -1697,7 +1614,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
|
||||
sort_description,
|
||||
partition_key_columns,
|
||||
global_ctx->merging_params,
|
||||
(is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources temporaty file is used only for vertical merge
|
||||
(is_vertical_merge ? ctx->rows_sources_temporary_file : nullptr), /// rows_sources temporaty file is used only for vertical merge
|
||||
(*data_settings)[MergeTreeSetting::merge_max_block_size],
|
||||
(*data_settings)[MergeTreeSetting::merge_max_block_size_bytes],
|
||||
ctx->blocks_are_granules_size,
|
||||
@ -1762,7 +1679,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
|
||||
|
||||
{
|
||||
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
|
||||
pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file;
|
||||
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
|
||||
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
|
||||
|
||||
|
@ -41,7 +41,6 @@ namespace DB
|
||||
|
||||
class MergeTask;
|
||||
using MergeTaskPtr = std::shared_ptr<MergeTask>;
|
||||
class RowsSourcesTemporaryFile;
|
||||
|
||||
/**
|
||||
* Overview of the merge algorithm
|
||||
@ -235,7 +234,7 @@ private:
|
||||
bool force_ttl{false};
|
||||
CompressionCodecPtr compression_codec{nullptr};
|
||||
size_t sum_input_rows_upper_bound{0};
|
||||
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
|
||||
std::shared_ptr<TemporaryDataBuffer> rows_sources_temporary_file;
|
||||
std::optional<ColumnSizeEstimator> column_sizes{};
|
||||
|
||||
/// For projections to rebuild
|
||||
@ -314,7 +313,7 @@ private:
|
||||
struct VerticalMergeRuntimeContext : public IStageRuntimeContext
|
||||
{
|
||||
/// Begin dependencies from previous stage
|
||||
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
|
||||
std::shared_ptr<TemporaryDataBuffer> rows_sources_temporary_file;
|
||||
std::optional<ColumnSizeEstimator> column_sizes;
|
||||
CompressionCodecPtr compression_codec;
|
||||
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
|
||||
|
@ -113,10 +113,11 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
|
||||
}
|
||||
|
||||
if (!prewhere_actions.steps.empty())
|
||||
LOG_TRACE(log, "PREWHERE condition was split into {} steps: {}", prewhere_actions.steps.size(), prewhere_actions.dumpConditions());
|
||||
LOG_TRACE(log, "PREWHERE condition was split into {} steps", prewhere_actions.steps.size());
|
||||
|
||||
if (prewhere_info)
|
||||
LOG_TEST(log, "Original PREWHERE DAG:\n{}\nPREWHERE actions:\n{}",
|
||||
LOG_TEST(log, "Original PREWHERE DAG:{}\n{}\nPREWHERE actions:\n{}",
|
||||
prewhere_actions.dumpConditions(),
|
||||
prewhere_info->prewhere_actions.dumpDAG(),
|
||||
(!prewhere_actions.steps.empty() ? prewhere_actions.dump() : std::string("<nullptr>")));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user