This commit is contained in:
vdimir 2024-10-17 15:10:39 +00:00
parent b09d3c5479
commit f238530cc5
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
8 changed files with 33 additions and 26 deletions

View File

@ -1639,6 +1639,11 @@ Block Aggregator::convertOneBucketToBlock(AggregatedDataVariants & variants, Are
return block; return block;
} }
std::vector<TemporaryBlockStreamHolder> & Aggregator::getTemporaryData()
{
return tmp_files;
}
template <typename Method> template <typename Method>
void Aggregator::writeToTemporaryFileImpl( void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants, AggregatedDataVariants & data_variants,

View File

@ -311,7 +311,7 @@ public:
bool hasTemporaryData() const { return !tmp_files.empty(); } bool hasTemporaryData() const { return !tmp_files.empty(); }
std::vector<TemporaryBlockStreamHolder> & getTemporaryData() { return tmp_files; } std::vector<TemporaryBlockStreamHolder> & getTemporaryData();
/// Get data structure of the result. /// Get data structure of the result.
Block getHeader(bool final) const; Block getHeader(bool final) const;

View File

@ -389,8 +389,8 @@ void GraceHashJoin::addBuckets(const size_t bucket_count)
for (size_t i = 0; i < bucket_count; ++i) for (size_t i = 0; i < bucket_count; ++i)
try try
{ {
TemporaryBlockStreamHolder left_file = TemporaryBlockStreamHolder(left_sample_block, tmp_data.get()); TemporaryBlockStreamHolder left_file(left_sample_block, tmp_data.get());
TemporaryBlockStreamHolder right_file = TemporaryBlockStreamHolder(prepareRightBlock(right_sample_block), tmp_data.get()); TemporaryBlockStreamHolder right_file(prepareRightBlock(right_sample_block), tmp_data.get());
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, std::move(left_file), std::move(right_file), log); BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, std::move(left_file), std::move(right_file), log);
tmp_buckets.emplace_back(std::move(new_bucket)); tmp_buckets.emplace_back(std::move(new_bucket));

View File

@ -59,7 +59,7 @@ struct NotProcessedCrossJoin : public ExtraBlock
{ {
size_t left_position; size_t left_position;
size_t right_block; size_t right_block;
TemporaryBlockStreamReaderHolder reader; std::optional<TemporaryBlockStreamReaderHolder> reader;
}; };
@ -513,9 +513,9 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|| (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join))) || (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join)))
{ {
if (!tmp_stream) if (!tmp_stream)
tmp_stream = TemporaryBlockStreamHolder(right_sample_block, tmp_data.get()); tmp_stream.emplace(right_sample_block, tmp_data.get());
tmp_stream->write(block_to_save); tmp_stream.value()->write(block_to_save);
return true; return true;
} }
@ -721,13 +721,14 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
{ {
size_t start_left_row = 0; size_t start_left_row = 0;
size_t start_right_block = 0; size_t start_right_block = 0;
TemporaryBlockStreamReaderHolder reader; std::optional<TemporaryBlockStreamReaderHolder> reader;
if (not_processed) if (not_processed)
{ {
auto & continuation = static_cast<NotProcessedCrossJoin &>(*not_processed); auto & continuation = static_cast<NotProcessedCrossJoin &>(*not_processed);
start_left_row = continuation.left_position; start_left_row = continuation.left_position;
start_right_block = continuation.right_block; start_right_block = continuation.right_block;
reader = std::move(continuation.reader); if (continuation.reader)
reader = std::move(*continuation.reader);
not_processed.reset(); not_processed.reset();
} }
@ -796,9 +797,9 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
if (tmp_stream && rows_added <= max_joined_block_rows) if (tmp_stream && rows_added <= max_joined_block_rows)
{ {
if (!reader) if (!reader)
reader = tmp_stream.getReadStream(); reader = tmp_stream->getReadStream();
while (auto block_right = reader->read()) while (auto block_right = reader.value()->read())
{ {
++block_number; ++block_number;
process_right_block(block_right); process_right_block(block_right);

View File

@ -424,7 +424,7 @@ private:
/// Needed to do external cross join /// Needed to do external cross join
TemporaryDataOnDiskScopePtr tmp_data; TemporaryDataOnDiskScopePtr tmp_data;
TemporaryBlockStreamHolder tmp_stream; std::optional<TemporaryBlockStreamHolder> tmp_stream;
mutable std::once_flag finish_writing; mutable std::once_flag finish_writing;
/// Block with columns from the right-side table. /// Block with columns from the right-side table.

View File

@ -114,18 +114,19 @@ template <typename Impl, typename Holder>
class WrapperGuard class WrapperGuard
{ {
public: public:
WrapperGuard() = default;
template <typename ... Args> template <typename ... Args>
WrapperGuard(std::unique_ptr<Holder> holder_, Args && ... args) WrapperGuard(std::unique_ptr<Holder> holder_, Args && ... args)
: holder(std::move(holder_)) : holder(std::move(holder_))
, impl(std::make_unique<Impl>(*holder, std::forward<Args>(args)...)) , impl(std::make_unique<Impl>(*holder, std::forward<Args>(args)...))
{} {
chassert(holder);
chassert(impl);
}
Impl * operator->() { return impl.get(); } Impl * operator->() { chassert(impl); chassert(holder); return impl.get(); }
const Impl * operator->() const { return impl.get(); } const Impl * operator->() const { chassert(impl); chassert(holder); return impl.get(); }
Impl & operator*() { return *impl; } Impl & operator*() { chassert(impl); chassert(holder); return *impl; }
const Impl & operator*() const { return *impl; } const Impl & operator*() const { chassert(impl); chassert(holder); return *impl; }
operator bool() const { return impl != nullptr; } operator bool() const { return impl != nullptr; }
const Holder * getHolder() const { return holder.get(); } const Holder * getHolder() const { return holder.get(); }
@ -153,13 +154,13 @@ public:
virtual std::unique_ptr<WriteBuffer> write() = 0; virtual std::unique_ptr<WriteBuffer> write() = 0;
virtual std::unique_ptr<ReadBuffer> read(size_t buffer_size) const = 0; virtual std::unique_ptr<ReadBuffer> read(size_t buffer_size) const = 0;
/// Get location for logging purposes /// Get location for logging
virtual String describeFilePath() const = 0; virtual String describeFilePath() const = 0;
virtual ~TemporaryFileHolder() = default; virtual ~TemporaryFileHolder() = default;
}; };
/// Reads raw data from temporary file
class TemporaryDataReadBuffer : public ReadBuffer class TemporaryDataReadBuffer : public ReadBuffer
{ {
public: public:
@ -173,7 +174,7 @@ private:
WrapperGuard<CompressedReadBuffer, ReadBuffer> compressed_buf; WrapperGuard<CompressedReadBuffer, ReadBuffer> compressed_buf;
}; };
/// Writes data to buffer provided by file_holder, and accounts amount of written data in parent scope. /// Writes raw data to buffer provided by file_holder, and accounts amount of written data in parent scope.
class TemporaryDataBuffer : public WriteBuffer class TemporaryDataBuffer : public WriteBuffer
{ {
public: public:
@ -206,13 +207,13 @@ private:
Stat stat; Stat stat;
}; };
/// High level interfaces for reading and writing temporary data by blocks.
using TemporaryBlockStreamReaderHolder = WrapperGuard<NativeReader, ReadBuffer>; using TemporaryBlockStreamReaderHolder = WrapperGuard<NativeReader, ReadBuffer>;
class TemporaryBlockStreamHolder : public WrapperGuard<NativeWriter, TemporaryDataBuffer> class TemporaryBlockStreamHolder : public WrapperGuard<NativeWriter, TemporaryDataBuffer>
{ {
public: public:
TemporaryBlockStreamHolder() = default;
TemporaryBlockStreamHolder(const Block & header_, TemporaryDataOnDiskScope * parent_, size_t max_file_size = 0); TemporaryBlockStreamHolder(const Block & header_, TemporaryDataOnDiskScope * parent_, size_t max_file_size = 0);
TemporaryBlockStreamReaderHolder getReadStream() const; TemporaryBlockStreamReaderHolder getReadStream() const;

View File

@ -69,7 +69,7 @@ public:
ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size))); ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
} }
Block block = tmp_read_stream->read(); Block block = tmp_read_stream.value()->read();
if (!block) if (!block)
return {}; return {};
@ -79,7 +79,7 @@ public:
private: private:
TemporaryBlockStreamHolder tmp_stream; TemporaryBlockStreamHolder tmp_stream;
TemporaryBlockStreamReaderHolder tmp_read_stream; std::optional<TemporaryBlockStreamReaderHolder> tmp_read_stream;
LoggerPtr log; LoggerPtr log;
}; };

View File

@ -964,7 +964,7 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
builder->addResource<std::shared_ptr<TemporaryDataBuffer>>(ctx->rows_sources_temporary_file, &QueryPlanResourceHolder::rows_sources_temporary_file); builder->addResource(ctx->rows_sources_temporary_file, &QueryPlanResourceHolder::rows_sources_temporary_file);
return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)}; return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)};
} }