Add disk proxies back, add setReadUntilPosition to ReadBuffer

This commit is contained in:
kssenii 2021-10-18 18:35:11 +03:00
parent 3d39ea8e37
commit d5d4817350
13 changed files with 60 additions and 54 deletions

View File

@ -58,6 +58,8 @@ public:
profile_callback = profile_callback_;
clock_type = clock_type_;
}
void setReadUntilPosition(size_t position) override { file_in->setReadUntilPosition(position); }
};
}

View File

@ -14,6 +14,7 @@
#include <IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Disks/DiskRestartProxy.h>
namespace ProfileEvents
@ -108,13 +109,6 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
}
void CompressedReadBufferBase::setRightOffset(size_t offset)
{
if (auto * async_in = dynamic_cast<AsynchronousReadIndirectBufferFromRemoteFS *>(compressed_in))
async_in->setRightOffset(offset);
}
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy)

View File

@ -60,12 +60,6 @@ public:
disable_checksum = true;
}
/**
* For asynchronous range reading from remote fs need to update last offset for current task,
* when newer tasks read behind previous task last mark.
*/
void setRightOffset(size_t offset);
public:
CompressionCodecPtr codec;
};

View File

@ -63,6 +63,7 @@ public:
file_in.setProfileCallback(profile_callback_, clock_type_);
}
void setReadUntilPosition(size_t position) override { file_in.setReadUntilPosition(position); }
};
}

View File

@ -1,7 +1,7 @@
#include "DiskRestartProxy.h"
#include <IO/ReadBufferFromFileDecorator.h>
#include <IO/WriteBufferFromFileDecorator.h>
#include <IO/RestartAwareReadBuffer.h>
namespace DB
{
@ -13,19 +13,6 @@ namespace ErrorCodes
using Millis = std::chrono::milliseconds;
using Seconds = std::chrono::seconds;
/// Holds restart read lock till buffer destruction.
class RestartAwareReadBuffer : public ReadBufferFromFileDecorator
{
public:
RestartAwareReadBuffer(const DiskRestartProxy & disk, std::unique_ptr<ReadBufferFromFileBase> impl_)
: ReadBufferFromFileDecorator(std::move(impl_)), lock(disk.mutex) { }
void prefetch() override { impl->prefetch(); }
private:
ReadLock lock;
};
/// Holds restart read lock till buffer finalize.
class RestartAwareWriteBuffer : public WriteBufferFromFileDecorator
{

View File

@ -166,7 +166,7 @@ void ReadBufferFromRemoteFSGather::seek(off_t offset)
}
void ReadBufferFromRemoteFSGather::setRightOffset(size_t offset)
void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t offset)
{
assert(last_offset < offset);
current_buf.reset();

View File

@ -32,7 +32,7 @@ public:
void seek(off_t offset); /// SEEK_SET only.
void setRightOffset(size_t offset);
void setReadUntilPosition(size_t position) override;
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0);

View File

@ -200,27 +200,27 @@ void registerDiskS3(DiskFactory & factory)
s3disk->startup();
// bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);
bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);
// if (cache_enabled)
// {
// String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
if (cache_enabled)
{
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
// if (metadata_path == cache_path)
// throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
if (metadata_path == cache_path)
throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
// auto cache_disk = std::make_shared<DiskLocal>("s3-cache", cache_path, 0);
// auto cache_file_predicate = [] (const String & path)
// {
// return path.ends_with("idx") // index files.
// || path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files.
// || path.ends_with("txt") || path.ends_with("dat");
// };
auto cache_disk = std::make_shared<DiskLocal>("s3-cache", cache_path, 0);
auto cache_file_predicate = [] (const String & path)
{
return path.ends_with("idx") // index files.
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files.
|| path.ends_with("txt") || path.ends_with("dat");
};
// s3disk = std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
// }
s3disk = std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
}
return s3disk;
return std::make_shared<DiskRestartProxy>(s3disk);
};
factory.registerDiskType("s3", creator);
}

View File

@ -78,8 +78,10 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
return;
if (absolute_position > last_offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
absolute_position, last_offset);
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {}) {}",
absolute_position, last_offset, buffer_events);
}
/// Prefetch even in case hasPendingData() == true.
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
@ -90,12 +92,12 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
}
void AsynchronousReadIndirectBufferFromRemoteFS::setRightOffset(size_t offset)
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t offset)
{
buffer_events += "-- Set last offset " + toString(offset) + "--";
if (prefetch_future.valid())
{
std::cerr << buffer_events << std::endl;
LOG_DEBUG(&Poco::Logger::get("kssenii"), buffer_events);
/// TODO: Planning to put logical error here after more testing,
// because seems like future is never supposed to be valid at this point.
std::terminate();
@ -107,7 +109,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setRightOffset(size_t offset)
}
last_offset = offset;
impl->setRightOffset(offset);
impl->setReadUntilPosition(offset);
}
@ -250,7 +252,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
prefetch_future.wait();
prefetch_future = {};
}
std::cerr << "Buffer events: " << buffer_events << std::endl;
LOG_DEBUG(&Poco::Logger::get("kssenii"), buffer_events);
}

View File

@ -47,7 +47,7 @@ public:
void prefetch() override;
void setRightOffset(size_t offset);
void setReadUntilPosition(size_t position) override;
private:
bool nextImpl() override;

View File

@ -202,6 +202,8 @@ public:
*/
virtual void prefetch() {}
virtual void setReadUntilPosition(size_t /* position */) {}
protected:
/// The number of bytes to ignore from the initial position of `working_buffer`
/// buffer. Apparently this is an additional out-parameter for nextImpl(),

View File

@ -0,0 +1,24 @@
#include <IO/ReadBufferFromFileDecorator.h>
#include <shared_mutex>
namespace DB
{
using ReadLock = std::shared_lock<std::shared_timed_mutex>;
/// Holds restart read lock till buffer destruction.
class RestartAwareReadBuffer : public ReadBufferFromFileDecorator
{
public:
RestartAwareReadBuffer(const DiskRestartProxy & disk, std::unique_ptr<ReadBufferFromFileBase> impl_)
: ReadBufferFromFileDecorator(std::move(impl_)), lock(disk.mutex) { }
void prefetch() override { impl->prefetch(); }
void setReadUntilPosition(size_t position) override { impl->setReadUntilPosition(position); }
private:
ReadLock lock;
};
}

View File

@ -187,9 +187,9 @@ void MergeTreeReaderStream::adjustForRange(size_t left_mark, size_t right_mark)
{
last_right_offset = right_offset;
if (cached_buffer)
cached_buffer->setRightOffset(last_right_offset);
cached_buffer->setReadUntilPosition(last_right_offset);
if (non_cached_buffer)
non_cached_buffer->setRightOffset(last_right_offset);
non_cached_buffer->setReadUntilPosition(last_right_offset);
}
}