#include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int TOO_MANY_ROWS_OR_BYTES; extern const int LOGICAL_ERROR; extern const int NOT_ENOUGH_SPACE; } void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta) { if (parent) parent->deltaAllocAndCheck(compressed_delta, uncompressed_delta); /// check that we don't go negative if ((compressed_delta < 0 && stat.compressed_size < static_cast(-compressed_delta)) || (uncompressed_delta < 0 && stat.uncompressed_size < static_cast(-uncompressed_delta))) { throw Exception(ErrorCodes::LOGICAL_ERROR, "Negative temporary data size"); } size_t new_consumprion = stat.compressed_size + compressed_delta; if (compressed_delta > 0 && limit && new_consumprion > limit) throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES, "Limit for temporary files size exceeded (would consume {} / {} bytes)", new_consumprion, limit); stat.compressed_size += compressed_delta; stat.uncompressed_size += uncompressed_delta; } TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size) { if (file_cache) return createStreamToCacheFile(header, max_file_size); else if (volume) return createStreamToRegularFile(header, max_file_size); throw Exception("TemporaryDataOnDiskScope has no cache and no volume", ErrorCodes::LOGICAL_ERROR); } TemporaryFileStream & TemporaryDataOnDisk::createStreamToCacheFile(const Block & header, size_t max_file_size) { if (!file_cache) throw Exception("TemporaryDataOnDiskScope has no cache", ErrorCodes::LOGICAL_ERROR); auto holder = file_cache->set(FileSegment::Key::random(), 0, std::max(10_MiB, max_file_size), CreateFileSegmentSettings(FileSegmentKind::Temporary, /* unbounded */ true)); std::lock_guard lock(mutex); TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique(std::move(holder), header, this)); return *tmp_stream; } TemporaryFileStream & TemporaryDataOnDisk::createStreamToRegularFile(const Block & header, size_t max_file_size) { if (!volume) throw Exception("TemporaryDataOnDiskScope has no volume", ErrorCodes::LOGICAL_ERROR); DiskPtr disk; if (max_file_size > 0) { auto reservation = volume->reserve(max_file_size); if (!reservation) throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE); disk = reservation->getDisk(); } else { disk = volume->getDisk(); } auto tmp_file = std::make_unique(disk, current_metric_scope); std::lock_guard lock(mutex); TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique(std::move(tmp_file), header, this)); return *tmp_stream; } std::vector TemporaryDataOnDisk::getStreams() const { std::vector res; std::lock_guard lock(mutex); res.reserve(streams.size()); for (const auto & stream : streams) res.push_back(stream.get()); return res; } bool TemporaryDataOnDisk::empty() const { std::lock_guard lock(mutex); return streams.empty(); } struct TemporaryFileStream::OutputWriter { OutputWriter(const String & path, const Block & header_) : out_buf(std::make_unique(path)) , out_compressed_buf(*out_buf) , out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_) { LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to {}", path); } OutputWriter(std::unique_ptr out_buf_, const Block & header_) : out_buf(std::move(out_buf_)) , out_compressed_buf(*out_buf) , out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_) { LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to {}", static_cast(out_buf.get())->getFileName()); } size_t write(const Block & block) { if (finalized) throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR); size_t written_bytes = out_writer.write(block); num_rows += block.rows(); return written_bytes; } void flush() { if (finalized) throw Exception("Cannot flush finalized stream", ErrorCodes::LOGICAL_ERROR); out_compressed_buf.next(); out_buf->next(); out_writer.flush(); } void finalize() { if (finalized) return; /// if we called finalize() explicitly, and got an exception, /// we don't want to get it again in the destructor, so set finalized flag first finalized = true; out_writer.flush(); out_compressed_buf.finalize(); out_buf->finalize(); } ~OutputWriter() { try { finalize(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } } std::unique_ptr out_buf; CompressedWriteBuffer out_compressed_buf; NativeWriter out_writer; std::atomic_size_t num_rows = 0; bool finalized = false; }; struct TemporaryFileStream::InputReader { InputReader(const String & path, const Block & header_) : in_file_buf(path) , in_compressed_buf(in_file_buf) , in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION) { LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path); } explicit InputReader(const String & path) : in_file_buf(path) , in_compressed_buf(in_file_buf) , in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION) { LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Reading from {}", path); } Block read() { return in_reader.read(); } ReadBufferFromFile in_file_buf; CompressedReadBuffer in_compressed_buf; NativeReader in_reader; }; TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_) : parent(parent_) , header(header_) , file(std::move(file_)) , out_writer(std::make_unique(file->getPath(), header)) { } TemporaryFileStream::TemporaryFileStream(FileSegmentsHolder && segments_, const Block & header_, TemporaryDataOnDisk * parent_) : parent(parent_) , header(header_) , segment_holder(std::move(segments_)) { if (segment_holder.file_segments.size() != 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream can be created only from single segment"); auto & segment = segment_holder.file_segments.front(); auto out_buf = std::make_unique(segment.get()); out_writer = std::make_unique(std::move(out_buf), header); } size_t TemporaryFileStream::write(const Block & block) { if (!out_writer) throw Exception("Writing has been finished", ErrorCodes::LOGICAL_ERROR); updateAllocAndCheck(); size_t bytes_written = out_writer->write(block); return bytes_written; } void TemporaryFileStream::flush() { if (!out_writer) throw Exception("Writing has been finished", ErrorCodes::LOGICAL_ERROR); out_writer->flush(); } TemporaryFileStream::Stat TemporaryFileStream::finishWriting() { if (isWriteFinished()) return stat; if (out_writer) { out_writer->finalize(); /// The amount of written data can be changed after finalization, some buffers can be flushed /// Need to update the stat updateAllocAndCheck(); out_writer.reset(); /// reader will be created at the first read call, not to consume memory before it is needed } return stat; } bool TemporaryFileStream::isWriteFinished() const { assert(in_reader == nullptr || out_writer == nullptr); return out_writer == nullptr; } Block TemporaryFileStream::read() { if (!isWriteFinished()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished"); if (isEof()) return {}; if (!in_reader) { in_reader = std::make_unique(getPath(), header); } Block block = in_reader->read(); if (!block) { /// finalize earlier to release resources, do not wait for the destructor this->release(); } return block; } void TemporaryFileStream::updateAllocAndCheck() { assert(out_writer); size_t new_compressed_size = out_writer->out_compressed_buf.getCompressedBytes(); size_t new_uncompressed_size = out_writer->out_compressed_buf.getUncompressedBytes(); if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size)) { throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}", getPath(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size); } parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size); stat.compressed_size = new_compressed_size; stat.uncompressed_size = new_uncompressed_size; stat.num_rows = out_writer->num_rows; } bool TemporaryFileStream::isEof() const { return file == nullptr && segment_holder.empty(); } void TemporaryFileStream::release() { if (in_reader) in_reader.reset(); if (out_writer) { out_writer->finalize(); out_writer.reset(); } if (file) { file.reset(); parent->deltaAllocAndCheck(-stat.compressed_size, -stat.uncompressed_size); } if (!segment_holder.empty()) segment_holder.reset(); } String TemporaryFileStream::getPath() const { if (file) return file->getPath(); if (!segment_holder.file_segments.empty()) return segment_holder.file_segments.front()->getPathInLocalCache(); throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryFileStream has no file"); } TemporaryFileStream::~TemporaryFileStream() { try { release(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); assert(false); /// deltaAllocAndCheck with negative can't throw exception } } }