mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 08:52:06 +00:00
aio read
This commit is contained in:
parent
dbb565f34a
commit
2e10fe5878
@ -16,6 +16,7 @@
|
||||
#include <ext/range.h>
|
||||
#include <ext/size.h>
|
||||
#include <ext/bit_cast.h>
|
||||
#include <numeric>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -60,10 +61,10 @@ namespace ErrorCodes
|
||||
|
||||
namespace
|
||||
{
|
||||
constexpr size_t MAX_KEYS_TO_READ_ONCE = 128;
|
||||
constexpr size_t SSD_BLOCK_SIZE = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
constexpr size_t BUFFER_ALIGNMENT = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
//constexpr size_t MAX_ATTRIBUTES_SIZE = 1024;
|
||||
|
||||
constexpr size_t MAX_BLOCKS_TO_KEEP_IN_MEMORY = 16;
|
||||
|
||||
static constexpr UInt64 KEY_METADATA_EXPIRES_AT_MASK = std::numeric_limits<std::chrono::system_clock::time_point::rep>::max();
|
||||
static constexpr UInt64 KEY_METADATA_IS_DEFAULT_MASK = ~KEY_METADATA_EXPIRES_AT_MASK;
|
||||
@ -183,8 +184,8 @@ CachePartition::CachePartition(
|
||||
ProfileEvents::increment(ProfileEvents::FileOpen);
|
||||
|
||||
const std::string filename = path + BIN_FILE_EXT;
|
||||
read_fd = ::open(filename.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_DIRECT, 0666);
|
||||
if (read_fd == -1)
|
||||
fd = ::open(filename.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_DIRECT, 0666);
|
||||
if (fd == -1)
|
||||
{
|
||||
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
|
||||
throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code);
|
||||
@ -194,7 +195,7 @@ CachePartition::CachePartition(
|
||||
|
||||
CachePartition::~CachePartition()
|
||||
{
|
||||
::close(read_fd);
|
||||
::close(fd);
|
||||
}
|
||||
|
||||
void CachePartition::appendBlock(const Attribute & new_keys, const Attributes & new_attributes)
|
||||
@ -260,8 +261,6 @@ void CachePartition::appendBlock(const Attribute & new_keys, const Attributes &
|
||||
}
|
||||
}
|
||||
|
||||
flush();
|
||||
write_buffer.emplace(memory.data(), SSD_BLOCK_SIZE);
|
||||
++index;
|
||||
}
|
||||
}
|
||||
@ -316,19 +315,18 @@ void CachePartition::flush()
|
||||
|
||||
AIOContext aio_context{1};
|
||||
|
||||
iocb write_request;
|
||||
memset(&write_request, 0, sizeof(write_request));
|
||||
iocb write_request{};
|
||||
iocb * write_request_ptr{&write_request};
|
||||
|
||||
#if defined(__FreeBSD__)
|
||||
write_request.aio.aio_lio_opcode = LIO_WRITE;
|
||||
write_request.aio.aio_fildes = fd;
|
||||
write_request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
|
||||
write_request.aio.aio_nbytes = region_aligned_size;
|
||||
write_request.aio.aio_offset = region_aligned_begin;
|
||||
write_request.aio.aio_buf = reinterpret_cast<volatile void *>(memory.data());
|
||||
write_request.aio.aio_nbytes = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
write_request.aio.aio_offset = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
#else
|
||||
write_request.aio_lio_opcode = IOCB_CMD_PWRITE;
|
||||
write_request.aio_fildes = read_fd;
|
||||
write_request.aio_fildes = fd;
|
||||
write_request.aio_buf = reinterpret_cast<UInt64>(memory.data());
|
||||
write_request.aio_nbytes = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
write_request.aio_offset = DEFAULT_AIO_FILE_BLOCK_SIZE * current_file_block_id;
|
||||
@ -367,9 +365,8 @@ void CachePartition::flush()
|
||||
if (bytes_written != static_cast<decltype(bytes_written)>(write_request.aio_nbytes))
|
||||
throw Exception("Not all data was written for asynchronous IO on file " + path + BIN_FILE_EXT + ". returned: " + std::to_string(bytes_written), ErrorCodes::AIO_WRITE_ERROR);
|
||||
|
||||
int res = ::fsync(read_fd);
|
||||
if (res == -1)
|
||||
throwFromErrnoWithPath("Cannot fsync " + path, path, ErrorCodes::CANNOT_FSYNC);
|
||||
if (::fsync(fd) < 0)
|
||||
throwFromErrnoWithPath("Cannot fsync " + path + BIN_FILE_EXT, path + BIN_FILE_EXT, ErrorCodes::CANNOT_FSYNC);
|
||||
|
||||
/// commit changes in index
|
||||
for (size_t row = 0; row < ids.size(); ++row)
|
||||
@ -417,7 +414,6 @@ template <typename Out>
|
||||
void CachePartition::getValueFromMemory(
|
||||
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const
|
||||
{
|
||||
//const auto & attribute = std::get<Attribute::Container<Out>>(attributes_buffer[attribute_index].values);
|
||||
for (size_t i = 0; i < indices.size(); ++i)
|
||||
{
|
||||
const auto & index = indices[i];
|
||||
@ -437,88 +433,113 @@ template <typename Out>
|
||||
void CachePartition::getValueFromStorage(
|
||||
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const
|
||||
{
|
||||
std::vector<std::pair<UInt64, size_t>> index_to_out;
|
||||
std::vector<std::pair<Index, size_t>> index_to_out;
|
||||
for (size_t i = 0; i < indices.size(); ++i)
|
||||
{
|
||||
const auto & index = indices[i];
|
||||
if (index.exists() && !index.inMemory())
|
||||
index_to_out.emplace_back(index.getAddressInBlock(), i);
|
||||
index_to_out.emplace_back(index, i);
|
||||
}
|
||||
if (index_to_out.empty())
|
||||
return;
|
||||
|
||||
/// sort by (block_id, offset_in_block)
|
||||
std::sort(std::begin(index_to_out), std::end(index_to_out));
|
||||
|
||||
DB::Memory read_buffer(SSD_BLOCK_SIZE * index_to_out.size(), BUFFER_ALIGNMENT);
|
||||
DB::Memory read_buffer(SSD_BLOCK_SIZE * MAX_BLOCKS_TO_KEEP_IN_MEMORY, BUFFER_ALIGNMENT);
|
||||
|
||||
std::vector<iocb> requests(index_to_out.size());
|
||||
memset(requests.data(), 0, requests.size() * sizeof(requests.front()));
|
||||
std::vector<iocb*> pointers(index_to_out.size());
|
||||
std::vector<iocb> requests;
|
||||
std::vector<iocb*> pointers;
|
||||
std::vector<std::vector<size_t>> blocks_to_indices;
|
||||
requests.reserve(index_to_out.size());
|
||||
pointers.reserve(index_to_out.size());
|
||||
blocks_to_indices.reserve(index_to_out.size());
|
||||
for (size_t i = 0; i < index_to_out.size(); ++i)
|
||||
{
|
||||
if (!requests.empty() &&
|
||||
static_cast<size_t>(requests.back().aio_offset) == index_to_out[i].first.getBlockId() * SSD_BLOCK_SIZE)
|
||||
{
|
||||
blocks_to_indices.back().push_back(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
iocb request{};
|
||||
#if defined(__FreeBSD__)
|
||||
request.aio.aio_lio_opcode = LIO_READ;
|
||||
request.aio.aio_fildes = read_fd;
|
||||
request.aio.aio_buf = reinterpret_cast<volatile void *>(read_buffer.data() + i * MAX_ATTRIBUTES_SIZE);
|
||||
request.aio.aio_nbytes = MAX_ATTRIBUTES_SIZE;
|
||||
request.aio.aio_fildes = fd;
|
||||
request.aio.aio_buf = reinterpret_cast<volatile void *>(
|
||||
reinterpret_cast<UInt64>(read_buffer.data()) + SSD_BLOCK_SIZE * (i % MAX_BLOCKS_TO_KEEP_IN_MEMORY));
|
||||
request.aio.aio_nbytes = SSD_BLOCK_SIZE;
|
||||
request.aio.aio_offset = index_to_out[i].first;
|
||||
request.aio_data = i;
|
||||
#else
|
||||
requests[i].aio_lio_opcode = IOCB_CMD_PREAD;
|
||||
requests[i].aio_fildes = read_fd;
|
||||
requests[i].aio_buf = reinterpret_cast<UInt64>(read_buffer.data()) + i * SSD_BLOCK_SIZE;
|
||||
requests[i].aio_nbytes = SSD_BLOCK_SIZE;
|
||||
requests[i].aio_offset = index_to_out[i].first;
|
||||
requests[i].aio_data = i;
|
||||
request.aio_lio_opcode = IOCB_CMD_PREAD;
|
||||
request.aio_fildes = fd;
|
||||
request.aio_buf = reinterpret_cast<UInt64>(read_buffer.data()) + SSD_BLOCK_SIZE * (i % MAX_BLOCKS_TO_KEEP_IN_MEMORY);
|
||||
request.aio_nbytes = SSD_BLOCK_SIZE;
|
||||
request.aio_offset = index_to_out[i].first.getBlockId() * SSD_BLOCK_SIZE;
|
||||
request.aio_data = i;
|
||||
#endif
|
||||
requests.push_back(request);
|
||||
pointers.push_back(&requests.back());
|
||||
|
||||
Poco::Logger::get("requests:").information();
|
||||
pointers[i] = &requests[i];
|
||||
blocks_to_indices.emplace_back();
|
||||
blocks_to_indices.back().push_back(i);
|
||||
}
|
||||
|
||||
Poco::Logger::get("requests:").information(std::to_string(requests.size()));
|
||||
|
||||
//const auto pointers = ext::map<std::vector>(
|
||||
// std::begin(requests), std::end(requests), [](const iocb & request) { return &request; });
|
||||
AIOContext aio_context(MAX_BLOCKS_TO_KEEP_IN_MEMORY);
|
||||
|
||||
AIOContext context(MAX_KEYS_TO_READ_ONCE);
|
||||
std::vector<bool> processed(requests.size(), false);
|
||||
std::vector<io_event> events(requests.size());
|
||||
|
||||
std::vector<io_event> events(index_to_out.size());
|
||||
|
||||
for (size_t i = 0; i < index_to_out.size(); i += MAX_KEYS_TO_READ_ONCE)
|
||||
size_t to_push = 0;
|
||||
size_t to_pop = 0;
|
||||
while (to_pop < requests.size())
|
||||
{
|
||||
size_t to_push = std::min(MAX_KEYS_TO_READ_ONCE, index_to_out.size() - i);
|
||||
size_t push_index = i;
|
||||
int pushed = 0;
|
||||
while (to_push > 0 && (pushed = io_submit(context.ctx, to_push, pointers.data() + push_index)) < 0)
|
||||
/// get io tasks from previous iteration
|
||||
size_t popped = 0;
|
||||
while (to_pop < to_push && (popped = io_getevents(aio_context.ctx, to_push - to_pop, to_push - to_pop, &events[to_pop], nullptr)) < 0)
|
||||
{
|
||||
if (errno != EINTR)
|
||||
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
|
||||
to_push -= pushed;
|
||||
push_index += pushed;
|
||||
pushed = 0;
|
||||
}
|
||||
|
||||
size_t to_get = std::min(MAX_KEYS_TO_READ_ONCE, index_to_out.size() - i);
|
||||
size_t got_index = i;
|
||||
int got = 0;
|
||||
while (to_get > 0 && (got = io_getevents(context.ctx, to_get, to_get, events.data() + got_index, NULL)) < 0)
|
||||
for (size_t i = to_pop; i < to_pop + popped; ++i)
|
||||
{
|
||||
const auto request_id = events[i].data;
|
||||
const auto & request = requests[request_id];
|
||||
if (events[i].res != static_cast<ssize_t>(request.aio_nbytes))
|
||||
throw Exception("AIO failed to read file " + path + BIN_FILE_EXT + ". returned: " + std::to_string(events[i].res), ErrorCodes::AIO_WRITE_ERROR);
|
||||
|
||||
for (const size_t idx : blocks_to_indices[request_id])
|
||||
{
|
||||
const auto & [file_index, out_index] = index_to_out[idx];
|
||||
DB::ReadBufferFromMemory buf(
|
||||
reinterpret_cast<char *>(request.aio_buf) + file_index.getAddressInBlock(),
|
||||
SSD_BLOCK_SIZE - file_index.getAddressInBlock());
|
||||
readValueFromBuffer(attribute_index, out[out_index], buf);
|
||||
|
||||
Poco::Logger::get("kek").information(std::to_string(file_index.getAddressInBlock()) + " " + std::to_string(file_index.getBlockId()));
|
||||
}
|
||||
|
||||
processed[request_id] = true;
|
||||
}
|
||||
|
||||
while (to_pop < requests.size() && processed[to_pop])
|
||||
++to_pop;
|
||||
|
||||
/// add new io tasks
|
||||
const size_t new_tasks_count = std::min(MAX_BLOCKS_TO_KEEP_IN_MEMORY - (to_push - to_pop), requests.size() - to_push);
|
||||
|
||||
size_t pushed = 0;
|
||||
while (new_tasks_count > 0 && (pushed = io_submit(aio_context.ctx, new_tasks_count, &pointers[to_push])) < 0)
|
||||
{
|
||||
if (errno != EINTR)
|
||||
throwFromErrno("io_getevents: Failed to get an event from asynchronous IO", ErrorCodes::CANNOT_IO_GETEVENTS);
|
||||
to_get -= got;
|
||||
got_index += got;
|
||||
got = 0;
|
||||
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
|
||||
}
|
||||
}
|
||||
|
||||
//std::sort(std::begin(events), std::end(events), [](const auto & lhs, const auto & rhs) { return lhs.data < rhs.data; });
|
||||
for (const auto & event : events)
|
||||
{
|
||||
Poco::Logger::get("Read:").information("ito: f:" + std::to_string(index_to_out[event.data].first) + " s:" + std::to_string(index_to_out[event.data].second));
|
||||
Poco::Logger::get("Read:").information("data: " + std::to_string(event.data) + " res: " + std::to_string(event.res));
|
||||
|
||||
DB::ReadBufferFromMemory buf(read_buffer.data() + event.data * SSD_BLOCK_SIZE, event.res);
|
||||
readValueFromBuffer(attribute_index, out[index_to_out[event.data].second], buf);
|
||||
to_push += pushed;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,8 +140,7 @@ private:
|
||||
std::string path;
|
||||
|
||||
//mutable std::shared_mutex rw_lock;
|
||||
//int index_fd;
|
||||
mutable int read_fd = -1;
|
||||
int fd = -1;
|
||||
|
||||
struct KeyMetadata final
|
||||
{
|
||||
@ -164,7 +163,6 @@ private:
|
||||
|
||||
Attribute keys_buffer;
|
||||
Attributes attributes_buffer;
|
||||
//MutableColumns buffer;
|
||||
|
||||
DB::Memory<> memory;
|
||||
std::optional<DB::WriteBuffer> write_buffer;
|
||||
|
Loading…
Reference in New Issue
Block a user