Revert "Reset downloader for cache file segment in TemporaryFileStream"

This reverts commit 07147771e6.
This commit is contained in:
vdimir 2023-04-04 14:22:33 +00:00
parent 07147771e6
commit 999a3889d0
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
2 changed files with 5 additions and 28 deletions

View File

@ -429,6 +429,7 @@ bool FileSegment::reserve(size_t size_to_reserve)
{
std::unique_lock segment_lock(mutex);
assertNotDetachedUnlocked(segment_lock);
assertIsDownloaderUnlocked("reserve", segment_lock);

View File

@ -137,18 +137,11 @@ struct TemporaryFileStream::OutputWriter
static_cast<const WriteBufferToFileSegment *>(out_buf.get())->getFileName());
}
size_t write(const Block & block, bool flush = false)
size_t write(const Block & block)
{
if (finalized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized stream");
size_t written_bytes = out_writer.write(block);
if (flush)
{
out_compressed_buf.next();
out_buf->next();
out_writer.flush();
}
num_rows += block.rows();
return written_bytes;
}
@ -241,12 +234,9 @@ TemporaryFileStream::TemporaryFileStream(FileSegmentsHolder && segments_, const
{
if (segment_holder.file_segments.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream can be created only from single segment");
auto & file_segment = segment_holder.file_segments.front();
auto out_buf = std::make_unique<WriteBufferToFileSegment>(file_segment.get());
auto & segment = segment_holder.file_segments.front();
auto out_buf = std::make_unique<WriteBufferToFileSegment>(segment.get());
out_writer = std::make_unique<OutputWriter>(std::move(out_buf), header);
/// `write` can be called from different thread, so we need to reset downloader
file_segment->completePartAndResetDownloader();
}
size_t TemporaryFileStream::write(const Block & block)
@ -255,21 +245,7 @@ size_t TemporaryFileStream::write(const Block & block)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been finished");
updateAllocAndCheck();
size_t bytes_written = 0;
if (segment_holder.empty())
{
bytes_written = out_writer->write(block);
}
else
{
auto & file_segment = segment_holder.file_segments.front();
/// We need to reset downloader for each block,
/// without it `file_segment->reserve` can be called only by the same thread
file_segment->getOrSetDownloader();
bytes_written = out_writer->write(block, true);
file_segment->completePartAndResetDownloader();
}
size_t bytes_written = out_writer->write(block);
return bytes_written;
}