mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 09:40:49 +00:00
Remove useless code
This commit is contained in:
parent
fe1c153c6f
commit
6eb5a5f4d9
@ -45,9 +45,6 @@
|
||||
M(CreatedReadBufferAIOFailed, "") \
|
||||
M(CreatedReadBufferMMap, "") \
|
||||
M(CreatedReadBufferMMapFailed, "") \
|
||||
M(CreatedWriteBufferOrdinary, "") \
|
||||
M(CreatedWriteBufferAIO, "") \
|
||||
M(CreatedWriteBufferAIOFailed, "") \
|
||||
M(DiskReadElapsedMicroseconds, "Total time spent waiting for read syscall. This include reads from page cache.") \
|
||||
M(DiskWriteElapsedMicroseconds, "Total time spent waiting for write syscall. This include writes to page cache.") \
|
||||
M(NetworkReceiveElapsedMicroseconds, "") \
|
||||
|
@ -21,6 +21,8 @@
|
||||
#include <ext/bit_cast.h>
|
||||
#include <filesystem>
|
||||
#include <city.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
@ -16,7 +16,6 @@
|
||||
#include <Core/Block.h>
|
||||
#include <Dictionaries/BucketCache.h>
|
||||
#include <IO/HashingWriteBuffer.h>
|
||||
#include <IO/WriteBufferAIO.h>
|
||||
#include <list>
|
||||
#include <pcg_random.hpp>
|
||||
#include <Poco/Logger.h>
|
||||
|
@ -22,6 +22,8 @@
|
||||
#include <numeric>
|
||||
#include <filesystem>
|
||||
#include <city.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
@ -19,7 +19,6 @@
|
||||
#include <Dictionaries/BucketCache.h>
|
||||
#include <ext/scope_guard.h>
|
||||
#include <IO/HashingWriteBuffer.h>
|
||||
#include <IO/WriteBufferAIO.h>
|
||||
#include <list>
|
||||
#include <pcg_random.hpp>
|
||||
#include <Poco/Logger.h>
|
||||
|
@ -146,7 +146,7 @@ DiskCacheWrapper::readFile(const String & path, size_t buf_size, size_t estimate
|
||||
auto tmp_path = path + ".tmp";
|
||||
{
|
||||
auto src_buffer = DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
|
||||
auto dst_buffer = cache_disk->writeFile(tmp_path, buf_size, WriteMode::Rewrite, estimated_size, aio_threshold);
|
||||
auto dst_buffer = cache_disk->writeFile(tmp_path, buf_size, WriteMode::Rewrite);
|
||||
copyData(*src_buffer, *dst_buffer);
|
||||
}
|
||||
cache_disk->moveFile(tmp_path, path);
|
||||
@ -175,10 +175,10 @@ DiskCacheWrapper::readFile(const String & path, size_t buf_size, size_t estimate
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold)
|
||||
DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
{
|
||||
if (!cache_file_predicate(path))
|
||||
return DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
|
||||
return DiskDecorator::writeFile(path, buf_size, mode);
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write file {} to cache", backQuote(path));
|
||||
|
||||
@ -187,12 +187,12 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
|
||||
cache_disk->createDirectories(dir_path);
|
||||
|
||||
return std::make_unique<CompletionAwareWriteBuffer>(
|
||||
cache_disk->writeFile(path, buf_size, mode, estimated_size, aio_threshold),
|
||||
[this, path, buf_size, mode, estimated_size, aio_threshold]()
|
||||
cache_disk->writeFile(path, buf_size, mode),
|
||||
[this, path, buf_size, mode]()
|
||||
{
|
||||
/// Copy file from cache to actual disk when cached buffer is finalized.
|
||||
auto src_buffer = cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, 0);
|
||||
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
|
||||
auto src_buffer = cache_disk->readFile(path, buf_size, 0, 0, 0);
|
||||
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode);
|
||||
copyData(*src_buffer, *dst_buffer);
|
||||
dst_buffer->finalize();
|
||||
},
|
||||
|
@ -36,7 +36,7 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase>
|
||||
readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const override;
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold) override;
|
||||
writeFile(const String & path, size_t buf_size, WriteMode mode) override;
|
||||
void remove(const String & path) override;
|
||||
void removeRecursive(const String & path) override;
|
||||
void createHardLink(const String & src_path, const String & dst_path) override;
|
||||
|
@ -125,9 +125,9 @@ DiskDecorator::readFile(const String & path, size_t buf_size, size_t estimated_s
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
DiskDecorator::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold)
|
||||
DiskDecorator::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
{
|
||||
return delegate->writeFile(path, buf_size, mode, estimated_size, aio_threshold);
|
||||
return delegate->writeFile(path, buf_size, mode);
|
||||
}
|
||||
|
||||
void DiskDecorator::remove(const String & path)
|
||||
|
@ -38,7 +38,7 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase>
|
||||
readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const override;
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold) override;
|
||||
writeFile(const String & path, size_t buf_size, WriteMode mode) override;
|
||||
void remove(const String & path) override;
|
||||
void removeRecursive(const String & path) override;
|
||||
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <Common/quoteString.h>
|
||||
|
||||
#include <IO/createReadBufferFromFileBase.h>
|
||||
#include <IO/createWriteBufferFromFileBase.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <unistd.h>
|
||||
|
||||
@ -232,10 +231,10 @@ DiskLocal::readFile(const String & path, size_t buf_size, size_t estimated_size,
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold)
|
||||
DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
{
|
||||
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
|
||||
return createWriteBufferFromFileBase(disk_path + path, estimated_size, aio_threshold, buf_size, flags);
|
||||
return std::make_unique<WriteBufferFromFile>(disk_path + path, buf_size, flags);
|
||||
}
|
||||
|
||||
void DiskLocal::remove(const String & path)
|
||||
|
@ -83,9 +83,7 @@ public:
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
size_t buf_size,
|
||||
WriteMode mode,
|
||||
size_t estimated_size,
|
||||
size_t aio_threshold) override;
|
||||
WriteMode mode) override;
|
||||
|
||||
void remove(const String & path) override;
|
||||
|
||||
|
@ -330,7 +330,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path
|
||||
return std::make_unique<ReadIndirectBuffer>(path, iter->second.data);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskMemory::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskMemory::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
|
@ -74,9 +74,7 @@ public:
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
size_t buf_size,
|
||||
WriteMode mode,
|
||||
size_t estimated_size,
|
||||
size_t aio_threshold) override;
|
||||
WriteMode mode) override;
|
||||
|
||||
void remove(const String & path) override;
|
||||
|
||||
|
@ -148,9 +148,7 @@ public:
|
||||
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
WriteMode mode = WriteMode::Rewrite,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0) = 0;
|
||||
WriteMode mode = WriteMode::Rewrite) = 0;
|
||||
|
||||
/// Remove file or directory. Throws exception if file doesn't exists or if directory is not empty.
|
||||
virtual void remove(const String & path) = 0;
|
||||
|
@ -665,7 +665,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
{
|
||||
bool exist = exists(path);
|
||||
if (exist && readMeta(path).read_only)
|
||||
|
@ -88,9 +88,7 @@ public:
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
size_t buf_size,
|
||||
WriteMode mode,
|
||||
size_t estimated_size,
|
||||
size_t aio_threshold) override;
|
||||
WriteMode mode) override;
|
||||
|
||||
void remove(const String & path) override;
|
||||
|
||||
|
@ -1,441 +0,0 @@
|
||||
#if defined(OS_LINUX) || defined(__FreeBSD__)
|
||||
|
||||
#include <IO/WriteBufferAIO.h>
|
||||
#include <Common/MemorySanitizer.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
#include <limits>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <errno.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event FileOpen;
|
||||
extern const Event WriteBufferAIOWrite;
|
||||
extern const Event WriteBufferAIOWriteBytes;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric Write;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
extern const int AIO_READ_ERROR;
|
||||
extern const int AIO_WRITE_ERROR;
|
||||
extern const int CANNOT_IO_SUBMIT;
|
||||
extern const int CANNOT_IO_GETEVENTS;
|
||||
extern const int CANNOT_TRUNCATE_FILE;
|
||||
extern const int CANNOT_FSYNC;
|
||||
}
|
||||
|
||||
|
||||
/// Note: an additional page is allocated that will contain data that
|
||||
/// do not fit into the main buffer.
|
||||
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
|
||||
char * existing_memory_)
|
||||
: WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
|
||||
flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
|
||||
filename(filename_)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::FileOpen);
|
||||
|
||||
/// Correct the buffer size information so that additional pages do not touch the base class `BufferBase`.
|
||||
this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
this->internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
flush_buffer.buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
flush_buffer.internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
int open_flags = (flags_ == -1) ? (O_RDWR | O_TRUNC | O_CREAT) : flags_;
|
||||
open_flags |= O_DIRECT;
|
||||
open_flags |= O_CLOEXEC;
|
||||
|
||||
fd = ::open(filename.c_str(), open_flags, mode_);
|
||||
if (fd == -1)
|
||||
{
|
||||
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
|
||||
throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code);
|
||||
}
|
||||
}
|
||||
|
||||
WriteBufferAIO::~WriteBufferAIO()
|
||||
{
|
||||
if (!aio_failed)
|
||||
{
|
||||
try
|
||||
{
|
||||
flush();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
if (fd != -1)
|
||||
::close(fd);
|
||||
}
|
||||
|
||||
off_t WriteBufferAIO::getPositionInFile()
|
||||
{
|
||||
return seek(0, SEEK_CUR);
|
||||
}
|
||||
|
||||
void WriteBufferAIO::sync()
|
||||
{
|
||||
flush();
|
||||
|
||||
/// Ask OS to flush data to disk.
|
||||
int res = ::fsync(fd);
|
||||
if (res == -1)
|
||||
throwFromErrnoWithPath("Cannot fsync " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSYNC);
|
||||
}
|
||||
|
||||
void WriteBufferAIO::nextImpl()
|
||||
{
|
||||
if (!offset())
|
||||
return;
|
||||
|
||||
if (waitForAIOCompletion())
|
||||
finalize();
|
||||
|
||||
/// Create a request for asynchronous write.
|
||||
prepare();
|
||||
|
||||
#if defined(__FreeBSD__)
|
||||
request.aio.aio_lio_opcode = LIO_WRITE;
|
||||
request.aio.aio_fildes = fd;
|
||||
request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
|
||||
request.aio.aio_nbytes = region_aligned_size;
|
||||
request.aio.aio_offset = region_aligned_begin;
|
||||
#else
|
||||
request.aio_lio_opcode = IOCB_CMD_PWRITE;
|
||||
request.aio_fildes = fd;
|
||||
request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
|
||||
request.aio_nbytes = region_aligned_size;
|
||||
request.aio_offset = region_aligned_begin;
|
||||
#endif
|
||||
|
||||
/// Send the request.
|
||||
while (io_submit(aio_context.ctx, 1, &request_ptr) < 0)
|
||||
{
|
||||
if (errno != EINTR)
|
||||
{
|
||||
aio_failed = true;
|
||||
throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::CANNOT_IO_SUBMIT);
|
||||
}
|
||||
}
|
||||
|
||||
is_pending_write = true;
|
||||
}
|
||||
|
||||
off_t WriteBufferAIO::seek(off_t off, int whence)
|
||||
{
|
||||
flush();
|
||||
|
||||
if (whence == SEEK_SET)
|
||||
{
|
||||
if (off < 0)
|
||||
throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
pos_in_file = off;
|
||||
}
|
||||
else if (whence == SEEK_CUR)
|
||||
{
|
||||
if (off >= 0)
|
||||
{
|
||||
if (off > (std::numeric_limits<off_t>::max() - pos_in_file))
|
||||
throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
}
|
||||
else if (off < -pos_in_file)
|
||||
throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
pos_in_file += off;
|
||||
}
|
||||
else
|
||||
throw Exception("WriteBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
|
||||
if (pos_in_file > max_pos_in_file)
|
||||
max_pos_in_file = pos_in_file;
|
||||
|
||||
return pos_in_file;
|
||||
}
|
||||
|
||||
void WriteBufferAIO::truncate(off_t length)
|
||||
{
|
||||
flush();
|
||||
|
||||
int res = ::ftruncate(fd, length);
|
||||
if (res == -1)
|
||||
throwFromErrnoWithPath("Cannot truncate file " + filename, filename, ErrorCodes::CANNOT_TRUNCATE_FILE);
|
||||
}
|
||||
|
||||
void WriteBufferAIO::flush()
|
||||
{
|
||||
next();
|
||||
if (waitForAIOCompletion())
|
||||
finalize();
|
||||
}
|
||||
|
||||
bool WriteBufferAIO::waitForAIOCompletion()
|
||||
{
|
||||
if (!is_pending_write)
|
||||
return false;
|
||||
|
||||
CurrentMetrics::Increment metric_increment_write{CurrentMetrics::Write};
|
||||
|
||||
io_event event;
|
||||
while (io_getevents(aio_context.ctx, 1, 1, &event, nullptr) < 0)
|
||||
{
|
||||
if (errno != EINTR)
|
||||
{
|
||||
aio_failed = true;
|
||||
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::CANNOT_IO_GETEVENTS);
|
||||
}
|
||||
}
|
||||
|
||||
// Unpoison the memory returned from an uninstrumented system function.
|
||||
__msan_unpoison(&event, sizeof(event));
|
||||
|
||||
is_pending_write = false;
|
||||
#if defined(__FreeBSD__)
|
||||
bytes_written = aio_return(reinterpret_cast<struct aiocb *>(event.udata));
|
||||
#else
|
||||
bytes_written = event.res;
|
||||
#endif
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void WriteBufferAIO::prepare()
|
||||
{
|
||||
/// Swap the main and duplicate buffers.
|
||||
swap(flush_buffer);
|
||||
|
||||
truncation_count = 0;
|
||||
|
||||
/*
|
||||
A page on disk or in memory
|
||||
|
||||
start address (starting position in case of disk) is a multiply of DEFAULT_AIO_FILE_BLOCK_SIZE
|
||||
:
|
||||
:
|
||||
+---------------+
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
+---------------+
|
||||
<--------------->
|
||||
:
|
||||
:
|
||||
DEFAULT_AIO_FILE_BLOCK_SIZE
|
||||
|
||||
*/
|
||||
|
||||
/*
|
||||
Representation of data on a disk
|
||||
|
||||
XXX : the data you want to write
|
||||
ZZZ : data that is already on disk or zeros, if there is no data
|
||||
|
||||
region_aligned_begin region_aligned_end
|
||||
: region_begin region_end :
|
||||
: : : :
|
||||
: : : :
|
||||
+---:-----------+---------------+---------------+---------------+--:------------+
|
||||
| : | | | | : |
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
| | | | | |
|
||||
+---------------+---------------+---------------+---------------+---------------+
|
||||
|
||||
<--><--------------------------------------------------------------><----------->
|
||||
: : :
|
||||
: : :
|
||||
region_left_padding region_size region_right_padding
|
||||
|
||||
<------------------------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
region_aligned_size
|
||||
*/
|
||||
|
||||
/// Region of the disk in which we want to write data.
|
||||
const off_t region_begin = pos_in_file;
|
||||
|
||||
if ((flush_buffer.offset() > static_cast<size_t>(std::numeric_limits<off_t>::max())) ||
|
||||
(pos_in_file > (std::numeric_limits<off_t>::max() - static_cast<off_t>(flush_buffer.offset()))))
|
||||
throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
const off_t region_end = pos_in_file + flush_buffer.offset();
|
||||
const size_t region_size = region_end - region_begin;
|
||||
|
||||
/// The aligned region of the disk into which we want to write the data.
|
||||
const size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
region_aligned_begin = region_begin - region_left_padding;
|
||||
|
||||
if (region_end > (std::numeric_limits<off_t>::max() - static_cast<off_t>(region_right_padding)))
|
||||
throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
const off_t region_aligned_end = region_end + region_right_padding;
|
||||
region_aligned_size = region_aligned_end - region_aligned_begin;
|
||||
|
||||
bytes_to_write = region_aligned_size;
|
||||
|
||||
/*
|
||||
Representing data in the buffer before processing
|
||||
|
||||
XXX : the data you want to write
|
||||
|
||||
buffer_begin buffer_end
|
||||
: :
|
||||
: :
|
||||
+---------------+---------------+---------------+-------------:-+
|
||||
| | | | : |
|
||||
+---------------+---------------+---------------+-------------+ |
|
||||
|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXX| |
|
||||
|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXX| |
|
||||
+---------------+---------------+---------------+-------------+ |
|
||||
| | | | |
|
||||
+---------------+---------------+---------------+---------------+
|
||||
|
||||
<------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
buffer_size
|
||||
*/
|
||||
|
||||
/// The buffer of data that we want to write to the disk.
|
||||
buffer_begin = flush_buffer.buffer().begin();
|
||||
Position buffer_end = buffer_begin + region_size;
|
||||
size_t buffer_size = buffer_end - buffer_begin;
|
||||
|
||||
/// Process the buffer so that it reflects the structure of the disk region.
|
||||
|
||||
/*
|
||||
Representation of data in the buffer after processing
|
||||
|
||||
XXX : the data you want to write
|
||||
ZZZ : data from disk or zeros, if there is no data
|
||||
|
||||
`buffer_begin` `buffer_end` extra page
|
||||
: : :
|
||||
: : :
|
||||
+---:-----------+---------------+---------------+---------------+--:------------+
|
||||
| | | | | : |
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
| | | | | |
|
||||
+---------------+---------------+---------------+---------------+---------------+
|
||||
|
||||
<--><--------------------------------------------------------------><----------->
|
||||
: : :
|
||||
: : :
|
||||
region_left_padding region_size region_right_padding
|
||||
|
||||
<------------------------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
region_aligned_size
|
||||
*/
|
||||
|
||||
if ((region_left_padding > 0) || (region_right_padding > 0))
|
||||
{
|
||||
char memory_page[DEFAULT_AIO_FILE_BLOCK_SIZE] __attribute__ ((aligned (DEFAULT_AIO_FILE_BLOCK_SIZE)));
|
||||
|
||||
if (region_left_padding > 0)
|
||||
{
|
||||
/// Move the buffer data to the right. Complete the beginning of the buffer with data from the disk.
|
||||
buffer_size += region_left_padding;
|
||||
buffer_end = buffer_begin + buffer_size;
|
||||
|
||||
::memmove(buffer_begin + region_left_padding, buffer_begin, (buffer_size - region_left_padding) * sizeof(*buffer_begin));
|
||||
|
||||
ssize_t read_count = ::pread(fd, memory_page, DEFAULT_AIO_FILE_BLOCK_SIZE, region_aligned_begin);
|
||||
if (read_count < 0)
|
||||
throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
|
||||
|
||||
size_t to_copy = std::min(static_cast<size_t>(read_count), region_left_padding);
|
||||
::memcpy(buffer_begin, memory_page, to_copy * sizeof(*buffer_begin));
|
||||
::memset(buffer_begin + to_copy, 0, (region_left_padding - to_copy) * sizeof(*buffer_begin));
|
||||
}
|
||||
|
||||
if (region_right_padding > 0)
|
||||
{
|
||||
/// Add the end of the buffer with data from the disk.
|
||||
ssize_t read_count = ::pread(fd, memory_page, DEFAULT_AIO_FILE_BLOCK_SIZE, region_aligned_end - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
if (read_count < 0)
|
||||
throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
|
||||
|
||||
Position truncation_begin;
|
||||
off_t offset = DEFAULT_AIO_FILE_BLOCK_SIZE - region_right_padding;
|
||||
if (read_count > offset)
|
||||
{
|
||||
::memcpy(buffer_end, memory_page + offset, (read_count - offset) * sizeof(*buffer_end));
|
||||
truncation_begin = buffer_end + (read_count - offset);
|
||||
truncation_count = DEFAULT_AIO_FILE_BLOCK_SIZE - read_count;
|
||||
}
|
||||
else
|
||||
{
|
||||
truncation_begin = buffer_end;
|
||||
truncation_count = region_right_padding;
|
||||
}
|
||||
|
||||
::memset(truncation_begin, 0, truncation_count * sizeof(*truncation_begin));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferAIO::finalize()
|
||||
{
|
||||
if (bytes_written < bytes_to_write)
|
||||
throw Exception("Asynchronous write error on file " + filename, ErrorCodes::AIO_WRITE_ERROR);
|
||||
|
||||
bytes_written -= truncation_count;
|
||||
|
||||
#if defined(__FreeBSD__)
|
||||
off_t aio_offset = request.aio.aio_offset;
|
||||
#else
|
||||
off_t aio_offset = request.aio_offset;
|
||||
#endif
|
||||
off_t pos_offset = bytes_written - (pos_in_file - aio_offset);
|
||||
|
||||
if (pos_in_file > (std::numeric_limits<off_t>::max() - pos_offset))
|
||||
throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
|
||||
pos_in_file += pos_offset;
|
||||
|
||||
if (pos_in_file > max_pos_in_file)
|
||||
max_pos_in_file = pos_in_file;
|
||||
|
||||
if (truncation_count > 0)
|
||||
{
|
||||
/// Truncate the file to remove unnecessary zeros from it.
|
||||
int res = ::ftruncate(fd, max_pos_in_file);
|
||||
if (res == -1)
|
||||
throwFromErrnoWithPath("Cannot truncate file " + filename, filename, ErrorCodes::CANNOT_TRUNCATE_FILE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -1,102 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#if defined(OS_LINUX) || defined(__FreeBSD__)
|
||||
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
#include <IO/AIO.h>
|
||||
#include <Core/Defines.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric OpenFileForWrite;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Class for asynchronous data writing.
|
||||
*/
|
||||
class WriteBufferAIO final : public WriteBufferFromFileBase
|
||||
{
|
||||
public:
|
||||
WriteBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
|
||||
char * existing_memory_ = nullptr);
|
||||
~WriteBufferAIO() override;
|
||||
|
||||
WriteBufferAIO(const WriteBufferAIO &) = delete;
|
||||
WriteBufferAIO & operator=(const WriteBufferAIO &) = delete;
|
||||
|
||||
off_t getPositionInFile();
|
||||
off_t seek(off_t off, int whence);
|
||||
void truncate(off_t length);
|
||||
void sync() override;
|
||||
std::string getFileName() const override { return filename; }
|
||||
int getFD() const { return fd; }
|
||||
|
||||
private:
|
||||
void nextImpl() override;
|
||||
|
||||
/// If there's still data in the buffer, we'll write them.
|
||||
void flush();
|
||||
/// Wait for the end of the current asynchronous task.
|
||||
bool waitForAIOCompletion();
|
||||
/// Prepare an asynchronous request.
|
||||
void prepare();
|
||||
///
|
||||
void finalize() override;
|
||||
|
||||
private:
|
||||
/// Buffer for asynchronous data writes.
|
||||
BufferWithOwnMemory<WriteBuffer> flush_buffer;
|
||||
|
||||
/// Description of the asynchronous write request.
|
||||
iocb request{};
|
||||
iocb * request_ptr{&request};
|
||||
|
||||
AIOContext aio_context{1};
|
||||
|
||||
const std::string filename;
|
||||
|
||||
/// The number of bytes to be written to the disk.
|
||||
off_t bytes_to_write = 0;
|
||||
/// Number of bytes written with the last request.
|
||||
off_t bytes_written = 0;
|
||||
/// The number of zero bytes to be cut from the end of the file
|
||||
/// after the data write operation completes.
|
||||
off_t truncation_count = 0;
|
||||
|
||||
/// The current position in the file.
|
||||
off_t pos_in_file = 0;
|
||||
/// The maximum position reached in the file.
|
||||
off_t max_pos_in_file = 0;
|
||||
|
||||
/// The starting position of the aligned region of the disk to which the data is written.
|
||||
off_t region_aligned_begin = 0;
|
||||
/// The size of the aligned region of the disk.
|
||||
size_t region_aligned_size = 0;
|
||||
|
||||
/// The file descriptor for writing.
|
||||
int fd = -1;
|
||||
|
||||
/// The data buffer that we want to write to the disk.
|
||||
Position buffer_begin = nullptr;
|
||||
|
||||
/// Is the asynchronous write operation still in progress?
|
||||
bool is_pending_write = false;
|
||||
/// Did the asynchronous operation fail?
|
||||
bool aio_failed = false;
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForWrite};
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -1,48 +0,0 @@
|
||||
#include <IO/createWriteBufferFromFileBase.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#if defined(OS_LINUX) || defined(__FreeBSD__)
|
||||
#include <IO/WriteBufferAIO.h>
|
||||
#endif
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event CreatedWriteBufferOrdinary;
|
||||
extern const Event CreatedWriteBufferAIO;
|
||||
extern const Event CreatedWriteBufferAIOFailed;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> createWriteBufferFromFileBase(const std::string & filename_, size_t estimated_size,
|
||||
size_t aio_threshold, size_t buffer_size_, int flags_, mode_t mode, char * existing_memory_,
|
||||
size_t alignment)
|
||||
{
|
||||
#if defined(OS_LINUX) || defined(__FreeBSD__)
|
||||
if (aio_threshold && estimated_size >= aio_threshold)
|
||||
{
|
||||
/// Attempt to open a file with O_DIRECT
|
||||
try
|
||||
{
|
||||
auto res = std::make_unique<WriteBufferAIO>(filename_, buffer_size_, flags_, mode, existing_memory_);
|
||||
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
|
||||
return res;
|
||||
}
|
||||
catch (const ErrnoException &)
|
||||
{
|
||||
/// Fallback to cached IO if O_DIRECT is not supported.
|
||||
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIOFailed);
|
||||
}
|
||||
}
|
||||
#else
|
||||
(void)aio_threshold;
|
||||
(void)estimated_size;
|
||||
#endif
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferOrdinary);
|
||||
return std::make_unique<WriteBufferFromFile>(filename_, buffer_size_, flags_, mode, existing_memory_, alignment);
|
||||
}
|
||||
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Create an object to write data to a file.
|
||||
* estimated_size - number of bytes to write
|
||||
* aio_threshold - the minimum number of bytes for asynchronous writes
|
||||
*
|
||||
* If aio_threshold = 0 or estimated_size < aio_threshold, the write operations are executed synchronously.
|
||||
* Otherwise, write operations are performed asynchronously.
|
||||
*/
|
||||
std::unique_ptr<WriteBufferFromFileBase> createWriteBufferFromFileBase(
|
||||
const std::string & filename_,
|
||||
size_t estimated_size,
|
||||
size_t aio_threshold,
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
int flags_ = -1,
|
||||
mode_t mode = 0666,
|
||||
char * existing_memory_ = nullptr,
|
||||
size_t alignment = 0);
|
||||
|
||||
}
|
@ -55,9 +55,6 @@ add_executable (write_int write_int.cpp)
|
||||
target_link_libraries (write_int PRIVATE clickhouse_common_io)
|
||||
|
||||
if (OS_LINUX OR OS_FREEBSD)
|
||||
add_executable(write_buffer_aio write_buffer_aio.cpp)
|
||||
target_link_libraries (write_buffer_aio PRIVATE clickhouse_common_io)
|
||||
|
||||
add_executable(read_buffer_aio read_buffer_aio.cpp)
|
||||
target_link_libraries (read_buffer_aio PRIVATE clickhouse_common_io)
|
||||
endif ()
|
||||
|
@ -1,498 +0,0 @@
|
||||
#include <IO/WriteBufferAIO.h>
|
||||
#include <Core/Defines.h>
|
||||
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <streambuf>
|
||||
#include <cstdlib>
|
||||
#include <functional>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
void run();
|
||||
[[noreturn]] void die(const std::string & msg);
|
||||
void runTest(unsigned int num, const std::function<bool()> & func);
|
||||
std::string createTmpFile();
|
||||
std::string generateString(size_t n);
|
||||
|
||||
bool test1();
|
||||
bool test2();
|
||||
bool test3();
|
||||
bool test4();
|
||||
bool test5();
|
||||
bool test6();
|
||||
bool test7();
|
||||
bool test8();
|
||||
bool test9();
|
||||
bool test10();
|
||||
|
||||
void run()
|
||||
{
|
||||
const std::vector<std::function<bool()>> tests =
|
||||
{
|
||||
test1,
|
||||
test2,
|
||||
test3,
|
||||
test4,
|
||||
test5,
|
||||
test6,
|
||||
test7,
|
||||
test8,
|
||||
test9,
|
||||
test10
|
||||
};
|
||||
|
||||
unsigned int num = 0;
|
||||
for (const auto & test : tests)
|
||||
{
|
||||
++num;
|
||||
runTest(num, test);
|
||||
}
|
||||
}
|
||||
|
||||
void die(const std::string & msg)
|
||||
{
|
||||
std::cout << msg;
|
||||
::exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
void runTest(unsigned int num, const std::function<bool()> & func)
|
||||
{
|
||||
bool ok;
|
||||
|
||||
try
|
||||
{
|
||||
ok = func();
|
||||
}
|
||||
catch (const DB::Exception & ex)
|
||||
{
|
||||
ok = false;
|
||||
std::cout << "Caught exception " << ex.displayText() << "\n";
|
||||
}
|
||||
catch (const std::exception & ex)
|
||||
{
|
||||
ok = false;
|
||||
std::cout << "Caught exception " << ex.what() << "\n";
|
||||
}
|
||||
|
||||
if (ok)
|
||||
std::cout << "Test " << num << " passed\n";
|
||||
else
|
||||
std::cout << "Test " << num << " failed\n";
|
||||
}
|
||||
|
||||
std::string createTmpFile()
|
||||
{
|
||||
char pattern[] = "/tmp/fileXXXXXX";
|
||||
char * dir = ::mkdtemp(pattern);
|
||||
if (dir == nullptr)
|
||||
die("Could not create directory");
|
||||
|
||||
return std::string(dir) + "/foo";
|
||||
}
|
||||
|
||||
std::string generateString(size_t n)
|
||||
{
|
||||
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
|
||||
|
||||
std::string buf;
|
||||
buf.reserve(n);
|
||||
|
||||
for (size_t i = 0; i < n; ++i)
|
||||
buf += symbols[i % symbols.length()];
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
bool test1()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
std::string buf = generateString(n);
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.write(buf.data(), buf.length());
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
return (received == buf);
|
||||
}
|
||||
|
||||
bool test2()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
std::string buf = generateString(n);
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.write(buf.data(), buf.length() / 2);
|
||||
out.seek(DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
|
||||
out.write(&buf[buf.length() / 2], buf.length() / 2);
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
if (received.substr(0, buf.length() / 2) != buf.substr(0, buf.length() / 2))
|
||||
return false;
|
||||
if (received.substr(buf.length() / 2, DEFAULT_AIO_FILE_BLOCK_SIZE) != std::string(DEFAULT_AIO_FILE_BLOCK_SIZE, '\0'))
|
||||
return false;
|
||||
if (received.substr(buf.length() / 2 + DEFAULT_AIO_FILE_BLOCK_SIZE) != buf.substr(buf.length() / 2))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool test3()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
std::string buf = generateString(n);
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.write(buf.data(), buf.length());
|
||||
|
||||
off_t pos1 = out.getPositionInFile();
|
||||
|
||||
out.truncate(buf.length() / 2);
|
||||
|
||||
off_t pos2 = out.getPositionInFile();
|
||||
|
||||
if (pos1 != pos2)
|
||||
return false;
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
return (received == buf.substr(0, buf.length() / 2));
|
||||
}
|
||||
|
||||
bool test4()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
std::string buf = generateString(n);
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.write(buf.data(), buf.length());
|
||||
|
||||
off_t pos1 = out.getPositionInFile();
|
||||
|
||||
out.truncate(3 * buf.length() / 2);
|
||||
|
||||
off_t pos2 = out.getPositionInFile();
|
||||
|
||||
if (pos1 != pos2)
|
||||
return false;
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
if (received.substr(0, buf.length()) != buf)
|
||||
return false;
|
||||
|
||||
if (received.substr(buf.length()) != std::string(buf.length() / 2, '\0'))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool test5()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
std::string buf = generateString(n);
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.seek(1, SEEK_SET);
|
||||
out.write(buf.data(), buf.length());
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
return received.substr(1) == buf;
|
||||
}
|
||||
|
||||
bool test6()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
std::string buf = generateString(n);
|
||||
|
||||
std::string buf2 = "1111111111";
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.seek(3, SEEK_SET);
|
||||
out.write(buf.data(), buf.length());
|
||||
out.seek(-2 * DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
|
||||
out.write(buf2.data(), buf2.length());
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
if (received.substr(3, 8 * DEFAULT_AIO_FILE_BLOCK_SIZE) != buf.substr(0, 8 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||
return false;
|
||||
|
||||
if (received.substr(3 + 8 * DEFAULT_AIO_FILE_BLOCK_SIZE, 10) != buf2)
|
||||
return false;
|
||||
|
||||
if (received.substr(13 + 8 * DEFAULT_AIO_FILE_BLOCK_SIZE) != buf.substr(10 + 8 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool test7()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
std::string buf2 = "11111111112222222222";
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.seek(DEFAULT_AIO_FILE_BLOCK_SIZE - (buf2.length() / 2), SEEK_SET);
|
||||
out.write(buf2.data(), buf2.length());
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
if (received.length() != 4106)
|
||||
return false;
|
||||
if (received.substr(0, 4086) != std::string(4086, '\0'))
|
||||
return false;
|
||||
if (received.substr(4086, 20) != buf2)
|
||||
return false;
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool test8()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
std::string buf2 = "11111111112222222222";
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.seek(2 * DEFAULT_AIO_FILE_BLOCK_SIZE - (buf2.length() / 2), SEEK_SET);
|
||||
out.write(buf2.data(), buf2.length());
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
if (received.length() != 8202)
|
||||
return false;
|
||||
if (received.substr(0, 8182) != std::string(8182, '\0'))
|
||||
return false;
|
||||
if (received.substr(8182, 20) != buf2)
|
||||
return false;
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool test9()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
size_t n = 3 * DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
std::string buf = generateString(n);
|
||||
|
||||
std::string buf2(DEFAULT_AIO_FILE_BLOCK_SIZE + 10, '1');
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.seek(3, SEEK_SET);
|
||||
out.write(buf.data(), buf.length());
|
||||
out.seek(-DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
|
||||
out.write(buf2.data(), buf2.length());
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
if (received.substr(3, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE) != buf.substr(0, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||
return false;
|
||||
|
||||
if (received.substr(3 + 2 * DEFAULT_AIO_FILE_BLOCK_SIZE, DEFAULT_AIO_FILE_BLOCK_SIZE + 10) != buf2)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool test10()
|
||||
{
|
||||
std::string filename = createTmpFile();
|
||||
|
||||
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE + 3;
|
||||
|
||||
std::string buf = generateString(n);
|
||||
|
||||
{
|
||||
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||
|
||||
if (out.getFileName() != filename)
|
||||
return false;
|
||||
if (out.getFD() == -1)
|
||||
return false;
|
||||
|
||||
out.write(buf.data(), buf.length());
|
||||
}
|
||||
|
||||
std::ifstream in(filename.c_str());
|
||||
if (!in.is_open())
|
||||
die("Could not open file");
|
||||
|
||||
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
|
||||
|
||||
in.close();
|
||||
fs::remove_all(fs::path(filename).parent_path().string());
|
||||
|
||||
return (received == buf);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
run();
|
||||
return 0;
|
||||
}
|
@ -1,5 +1,4 @@
|
||||
#include <Storages/MergeTree/IMergedBlockOutputStream.h>
|
||||
#include <IO/createWriteBufferFromFileBase.h>
|
||||
#include <Storages/MergeTree/MergeTreeIOSettings.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
|
||||
|
||||
|
@ -910,7 +910,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
if (metadata_snapshot->hasSecondaryIndices())
|
||||
{
|
||||
const auto & indices = metadata_snapshot->getSecondaryIndices();
|
||||
merged_stream = std::make_shared<ExpressionBlockInputStream>(merged_stream, indices.getSingleExpressionForIndices(metadata_snapshot->getColumns(), data.global_context));
|
||||
merged_stream = std::make_shared<ExpressionBlockInputStream>(
|
||||
merged_stream, indices.getSingleExpressionForIndices(metadata_snapshot->getColumns(), data.global_context));
|
||||
merged_stream = std::make_shared<MaterializingBlockInputStream>(merged_stream);
|
||||
}
|
||||
|
||||
@ -921,7 +922,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
merging_columns,
|
||||
index_factory.getMany(metadata_snapshot->getSecondaryIndices()),
|
||||
compression_codec,
|
||||
data_settings->min_merge_bytes_to_use_direct_io,
|
||||
blocks_are_granules_size};
|
||||
|
||||
merged_stream->readPrefix();
|
||||
|
@ -24,9 +24,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
|
||||
, plain_file(data_part->volume->getDisk()->writeFile(
|
||||
part_path + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION,
|
||||
settings.max_compress_block_size,
|
||||
WriteMode::Rewrite,
|
||||
settings.estimated_size,
|
||||
settings.aio_threshold))
|
||||
WriteMode::Rewrite))
|
||||
, plain_hashing(*plain_file)
|
||||
, marks_file(data_part->volume->getDisk()->writeFile(
|
||||
part_path + MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
|
||||
|
@ -39,13 +39,11 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
|
||||
const std::string & marks_path_,
|
||||
const std::string & marks_file_extension_,
|
||||
const CompressionCodecPtr & compression_codec_,
|
||||
size_t max_compress_block_size_,
|
||||
size_t estimated_size_,
|
||||
size_t aio_threshold_) :
|
||||
size_t max_compress_block_size_) :
|
||||
escaped_column_name(escaped_column_name_),
|
||||
data_file_extension{data_file_extension_},
|
||||
marks_file_extension{marks_file_extension_},
|
||||
plain_file(disk_->writeFile(data_path_ + data_file_extension, max_compress_block_size_, WriteMode::Rewrite, estimated_size_, aio_threshold_)),
|
||||
plain_file(disk_->writeFile(data_path_ + data_file_extension, max_compress_block_size_, WriteMode::Rewrite)),
|
||||
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec_), compressed(compressed_buf),
|
||||
marks_file(disk_->writeFile(marks_path_ + marks_file_extension, 4096, WriteMode::Rewrite)), marks(*marks_file)
|
||||
{
|
||||
@ -164,8 +162,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
|
||||
data_part->volume->getDisk(),
|
||||
part_path + stream_name, INDEX_FILE_EXTENSION,
|
||||
part_path + stream_name, marks_file_extension,
|
||||
default_codec, settings.max_compress_block_size,
|
||||
0, settings.aio_threshold));
|
||||
default_codec, settings.max_compress_block_size));
|
||||
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
|
||||
skip_index_accumulated_marks.push_back(0);
|
||||
}
|
||||
|
@ -56,9 +56,7 @@ public:
|
||||
const std::string & marks_path_,
|
||||
const std::string & marks_file_extension_,
|
||||
const CompressionCodecPtr & compression_codec_,
|
||||
size_t max_compress_block_size_,
|
||||
size_t estimated_size_,
|
||||
size_t aio_threshold_);
|
||||
size_t max_compress_block_size_);
|
||||
|
||||
String escaped_column_name;
|
||||
std::string data_file_extension;
|
||||
|
@ -80,14 +80,13 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
|
||||
{
|
||||
const auto & columns = metadata_snapshot->getColumns();
|
||||
for (const auto & it : columns_list)
|
||||
addStreams(it.name, *it.type, columns.getCodecDescOrDefault(it.name, default_codec), settings.estimated_size);
|
||||
addStreams(it.name, *it.type, columns.getCodecDescOrDefault(it.name, default_codec));
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterWide::addStreams(
|
||||
const String & name,
|
||||
const IDataType & type,
|
||||
const ASTPtr & effective_codec_desc,
|
||||
size_t estimated_size)
|
||||
const ASTPtr & effective_codec_desc)
|
||||
{
|
||||
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & substream_type)
|
||||
{
|
||||
@ -109,9 +108,7 @@ void MergeTreeDataPartWriterWide::addStreams(
|
||||
part_path + stream_name, DATA_FILE_EXTENSION,
|
||||
part_path + stream_name, marks_file_extension,
|
||||
compression_codec,
|
||||
settings.max_compress_block_size,
|
||||
estimated_size,
|
||||
settings.aio_threshold);
|
||||
settings.max_compress_block_size);
|
||||
};
|
||||
|
||||
IDataType::SubstreamPath stream_path;
|
||||
|
@ -85,8 +85,7 @@ private:
|
||||
void addStreams(
|
||||
const String & name,
|
||||
const IDataType & type,
|
||||
const ASTPtr & effective_codec_desc,
|
||||
size_t estimated_size);
|
||||
const ASTPtr & effective_codec_desc);
|
||||
|
||||
/// Method for self check (used in debug-build only). Checks that written
|
||||
/// data and corresponding marks are consistent. Otherwise throws logical
|
||||
|
@ -24,7 +24,6 @@ struct MergeTreeWriterSettings
|
||||
const Settings & global_settings,
|
||||
const MergeTreeSettingsPtr & storage_settings,
|
||||
bool can_use_adaptive_granularity_,
|
||||
size_t aio_threshold_,
|
||||
bool rewrite_primary_key_,
|
||||
bool blocks_are_granules_size_ = false)
|
||||
: min_compress_block_size(
|
||||
@ -32,7 +31,6 @@ struct MergeTreeWriterSettings
|
||||
, max_compress_block_size(
|
||||
storage_settings->max_compress_block_size ? storage_settings->max_compress_block_size
|
||||
: global_settings.max_compress_block_size)
|
||||
, aio_threshold(aio_threshold_)
|
||||
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
|
||||
, rewrite_primary_key(rewrite_primary_key_)
|
||||
, blocks_are_granules_size(blocks_are_granules_size_)
|
||||
@ -41,14 +39,9 @@ struct MergeTreeWriterSettings
|
||||
|
||||
size_t min_compress_block_size;
|
||||
size_t max_compress_block_size;
|
||||
size_t aio_threshold;
|
||||
bool can_use_adaptive_granularity;
|
||||
bool rewrite_primary_key;
|
||||
bool blocks_are_granules_size;
|
||||
|
||||
/// Used for AIO threshold comparison
|
||||
/// FIXME currently doesn't work because WriteBufferAIO contain obscure bug(s)
|
||||
size_t estimated_size = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -21,25 +21,6 @@ MergedBlockOutputStream::MergedBlockOutputStream(
|
||||
const MergeTreeIndices & skip_indices,
|
||||
CompressionCodecPtr default_codec_,
|
||||
bool blocks_are_granules_size)
|
||||
: MergedBlockOutputStream(
|
||||
data_part,
|
||||
metadata_snapshot_,
|
||||
columns_list_,
|
||||
skip_indices,
|
||||
default_codec_,
|
||||
data_part->storage.global_context.getSettings().min_bytes_to_use_direct_io,
|
||||
blocks_are_granules_size)
|
||||
{
|
||||
}
|
||||
|
||||
MergedBlockOutputStream::MergedBlockOutputStream(
|
||||
const MergeTreeDataPartPtr & data_part,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const NamesAndTypesList & columns_list_,
|
||||
const MergeTreeIndices & skip_indices,
|
||||
CompressionCodecPtr default_codec_,
|
||||
size_t aio_threshold,
|
||||
bool blocks_are_granules_size)
|
||||
: IMergedBlockOutputStream(data_part, metadata_snapshot_)
|
||||
, columns_list(columns_list_)
|
||||
, default_codec(default_codec_)
|
||||
@ -48,7 +29,6 @@ MergedBlockOutputStream::MergedBlockOutputStream(
|
||||
storage.global_context.getSettings(),
|
||||
storage.getSettings(),
|
||||
data_part->index_granularity_info.is_adaptive,
|
||||
aio_threshold,
|
||||
/* rewrite_primary_key = */ true,
|
||||
blocks_are_granules_size);
|
||||
|
||||
|
@ -21,15 +21,6 @@ public:
|
||||
CompressionCodecPtr default_codec_,
|
||||
bool blocks_are_granules_size = false);
|
||||
|
||||
MergedBlockOutputStream(
|
||||
const MergeTreeDataPartPtr & data_part,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const NamesAndTypesList & columns_list_,
|
||||
const MergeTreeIndices & skip_indices,
|
||||
CompressionCodecPtr default_codec_,
|
||||
size_t aio_threshold,
|
||||
bool blocks_are_granules_size = false);
|
||||
|
||||
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
|
||||
|
||||
/// If the data is pre-sorted.
|
||||
|
Loading…
Reference in New Issue
Block a user