mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 08:52:06 +00:00
fix
This commit is contained in:
parent
b62ac3aa80
commit
dbb565f34a
@ -5,6 +5,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/ProfilingScopedRWLock.h>
|
||||
#include <Common/MemorySanitizer.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include "DictionaryFactory.h"
|
||||
#include <IO/AIO.h>
|
||||
@ -29,11 +30,14 @@ namespace ProfileEvents
|
||||
extern const Event DictCacheLockWriteNs;
|
||||
extern const Event DictCacheLockReadNs;
|
||||
extern const Event FileOpen;
|
||||
extern const Event WriteBufferAIOWrite;
|
||||
extern const Event WriteBufferAIOWriteBytes;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric DictCacheRequests;
|
||||
extern const Metric Write;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -50,6 +54,8 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int CANNOT_IO_SUBMIT;
|
||||
extern const int CANNOT_IO_GETEVENTS;
|
||||
extern const int AIO_WRITE_ERROR;
|
||||
extern const int CANNOT_FSYNC;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -57,7 +63,7 @@ namespace
|
||||
constexpr size_t MAX_KEYS_TO_READ_ONCE = 128;
|
||||
constexpr size_t SSD_BLOCK_SIZE = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
constexpr size_t BUFFER_ALIGNMENT = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
constexpr size_t MAX_ATTRIBUTES_SIZE = 1024;
|
||||
//constexpr size_t MAX_ATTRIBUTES_SIZE = 1024;
|
||||
|
||||
static constexpr UInt64 KEY_METADATA_EXPIRES_AT_MASK = std::numeric_limits<std::chrono::system_clock::time_point::rep>::max();
|
||||
static constexpr UInt64 KEY_METADATA_IS_DEFAULT_MASK = ~KEY_METADATA_EXPIRES_AT_MASK;
|
||||
@ -177,7 +183,7 @@ CachePartition::CachePartition(
|
||||
ProfileEvents::increment(ProfileEvents::FileOpen);
|
||||
|
||||
const std::string filename = path + BIN_FILE_EXT;
|
||||
read_fd = ::open(filename.c_str(), O_RDONLY | O_DIRECT);
|
||||
read_fd = ::open(filename.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_DIRECT, 0666);
|
||||
if (read_fd == -1)
|
||||
{
|
||||
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
|
||||
@ -201,7 +207,7 @@ void CachePartition::appendBlock(const Attribute & new_keys, const Attributes &
|
||||
appendValuesToAttribute(keys_buffer, new_keys);
|
||||
|
||||
if (!write_buffer)
|
||||
write_buffer.emplace(memory.data(), memory.size());
|
||||
write_buffer.emplace(memory.data(), SSD_BLOCK_SIZE);
|
||||
|
||||
for (size_t index = 0; index < ids.size();)
|
||||
{
|
||||
@ -213,13 +219,15 @@ void CachePartition::appendBlock(const Attribute & new_keys, const Attributes &
|
||||
for (const auto & attribute : new_attributes)
|
||||
{
|
||||
// TODO:: переделать через столбцы + getDataAt
|
||||
switch (attribute.type) {
|
||||
switch (attribute.type)
|
||||
{
|
||||
#define DISPATCH(TYPE) \
|
||||
case AttributeUnderlyingType::ut##TYPE: \
|
||||
{ \
|
||||
if (sizeof(TYPE) > write_buffer->available()) \
|
||||
{ \
|
||||
flush(); \
|
||||
write_buffer.emplace(memory.data(), SSD_BLOCK_SIZE); \
|
||||
continue; \
|
||||
} \
|
||||
else \
|
||||
@ -251,6 +259,9 @@ void CachePartition::appendBlock(const Attribute & new_keys, const Attributes &
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
flush();
|
||||
write_buffer.emplace(memory.data(), SSD_BLOCK_SIZE);
|
||||
++index;
|
||||
}
|
||||
}
|
||||
@ -296,73 +307,80 @@ size_t CachePartition::appendValuesToAttribute(Attribute & to, const Attribute &
|
||||
|
||||
void CachePartition::flush()
|
||||
{
|
||||
if (!write_data_buffer)
|
||||
{
|
||||
//write_data_buffer = std::make_unique<WriteBufferAIO>(path + BIN_FILE_EXT, buffer_size, O_RDWR | O_CREAT | O_TRUNC);
|
||||
// TODO: не перетирать + seek в конец файла
|
||||
}
|
||||
|
||||
write_buffer.reset();
|
||||
const auto & ids = std::get<Attribute::Container<UInt64>>(keys_buffer.values);
|
||||
if (ids.empty())
|
||||
return;
|
||||
|
||||
Poco::Logger::get("paritiiton").information("@@@@@@@@@@@@@@@@@@@@ FLUSH!!!");
|
||||
|
||||
std::vector<size_t> offsets;
|
||||
AIOContext aio_context{1};
|
||||
|
||||
size_t prev_size = 0;
|
||||
for (size_t row = 0; row < ids.size(); ++row)
|
||||
iocb write_request;
|
||||
memset(&write_request, 0, sizeof(write_request));
|
||||
iocb * write_request_ptr{&write_request};
|
||||
|
||||
#if defined(__FreeBSD__)
|
||||
write_request.aio.aio_lio_opcode = LIO_WRITE;
|
||||
write_request.aio.aio_fildes = fd;
|
||||
write_request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
|
||||
write_request.aio.aio_nbytes = region_aligned_size;
|
||||
write_request.aio.aio_offset = region_aligned_begin;
|
||||
#else
|
||||
write_request.aio_lio_opcode = IOCB_CMD_PWRITE;
|
||||
write_request.aio_fildes = read_fd;
|
||||
write_request.aio_buf = reinterpret_cast<UInt64>(memory.data());
|
||||
write_request.aio_nbytes = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
write_request.aio_offset = DEFAULT_AIO_FILE_BLOCK_SIZE * current_file_block_id;
|
||||
#endif
|
||||
|
||||
Poco::Logger::get("try:").information("offset: " + std::to_string(write_request.aio_offset) + " nbytes: " + std::to_string(write_request.aio_nbytes));
|
||||
|
||||
while (io_submit(aio_context.ctx, 1, &write_request_ptr) < 0)
|
||||
{
|
||||
offsets.push_back((offsets.empty() ? write_data_buffer->getPositionInFile() : offsets.back()) + prev_size);
|
||||
prev_size = 0;
|
||||
|
||||
for (size_t col = 0; col < attributes_buffer.size(); ++col)
|
||||
{
|
||||
const auto & attribute = attributes_buffer[col];
|
||||
|
||||
switch (attribute.type)
|
||||
{
|
||||
#define DISPATCH(TYPE) \
|
||||
case AttributeUnderlyingType::ut##TYPE: \
|
||||
{ \
|
||||
const auto & values = std::get<Attribute::Container<TYPE>>(attribute.values); \
|
||||
writeBinary(values[row], *static_cast<WriteBuffer*>(write_data_buffer.get())); \
|
||||
} \
|
||||
break;
|
||||
|
||||
DISPATCH(UInt8)
|
||||
DISPATCH(UInt16)
|
||||
DISPATCH(UInt32)
|
||||
DISPATCH(UInt64)
|
||||
DISPATCH(UInt128)
|
||||
DISPATCH(Int8)
|
||||
DISPATCH(Int16)
|
||||
DISPATCH(Int32)
|
||||
DISPATCH(Int64)
|
||||
DISPATCH(Decimal32)
|
||||
DISPATCH(Decimal64)
|
||||
DISPATCH(Decimal128)
|
||||
DISPATCH(Float32)
|
||||
DISPATCH(Float64)
|
||||
#undef DISPATCH
|
||||
|
||||
case AttributeUnderlyingType::utString:
|
||||
// TODO: string support
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (errno != EINTR)
|
||||
throw Exception("Cannot submit request for asynchronous IO on file " + path + BIN_FILE_EXT, ErrorCodes::CANNOT_IO_SUBMIT);
|
||||
}
|
||||
write_data_buffer->sync();
|
||||
|
||||
CurrentMetrics::Increment metric_increment_write{CurrentMetrics::Write};
|
||||
|
||||
io_event event;
|
||||
while (io_getevents(aio_context.ctx, 1, 1, &event, nullptr) < 0)
|
||||
{
|
||||
if (errno != EINTR)
|
||||
throw Exception("Failed to wait for asynchronous IO completion on file " + path + BIN_FILE_EXT, ErrorCodes::CANNOT_IO_GETEVENTS);
|
||||
}
|
||||
|
||||
// Unpoison the memory returned from an uninstrumented system function.
|
||||
__msan_unpoison(&event, sizeof(event));
|
||||
|
||||
ssize_t bytes_written;
|
||||
#if defined(__FreeBSD__)
|
||||
bytes_written = aio_return(reinterpret_cast<struct aiocb *>(event.udata));
|
||||
#else
|
||||
bytes_written = event.res;
|
||||
#endif
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
|
||||
|
||||
if (bytes_written != static_cast<decltype(bytes_written)>(write_request.aio_nbytes))
|
||||
throw Exception("Not all data was written for asynchronous IO on file " + path + BIN_FILE_EXT + ". returned: " + std::to_string(bytes_written), ErrorCodes::AIO_WRITE_ERROR);
|
||||
|
||||
int res = ::fsync(read_fd);
|
||||
if (res == -1)
|
||||
throwFromErrnoWithPath("Cannot fsync " + path, path, ErrorCodes::CANNOT_FSYNC);
|
||||
|
||||
/// commit changes in index
|
||||
for (size_t row = 0; row < ids.size(); ++row)
|
||||
{
|
||||
key_to_metadata[ids[row]].index.setInMemory(false);
|
||||
key_to_metadata[ids[row]].index.setBlockId(current_file_block_id);
|
||||
key_to_metadata[ids[row]].index.setAddressInBlock(offsets[row]);
|
||||
Poco::Logger::get("INDEX:").information("NEW MAP: " + std::to_string(ids[row]) + " -> " + std::to_string(key_to_metadata[ids[row]].index.index));
|
||||
}
|
||||
|
||||
++current_file_block_id;
|
||||
|
||||
/// clear buffer
|
||||
std::visit([](auto & attr) { attr.clear(); }, keys_buffer.values);
|
||||
for (auto & attribute : attributes_buffer)
|
||||
@ -409,7 +427,7 @@ void CachePartition::getValueFromMemory(
|
||||
|
||||
Poco::Logger::get("part:").information("GET FROM MEMORY " + std::to_string(i) + " --- " + std::to_string(offset));
|
||||
|
||||
ReadBufferFromMemory read_buffer(memory.data() + offset, memory.size() - offset);
|
||||
ReadBufferFromMemory read_buffer(memory.data() + offset, SSD_BLOCK_SIZE - offset);
|
||||
readValueFromBuffer(attribute_index, out[i], read_buffer);
|
||||
}
|
||||
}
|
||||
@ -431,7 +449,7 @@ void CachePartition::getValueFromStorage(
|
||||
|
||||
std::sort(std::begin(index_to_out), std::end(index_to_out));
|
||||
|
||||
DB::Memory read_buffer(MAX_ATTRIBUTES_SIZE * index_to_out.size(), BUFFER_ALIGNMENT);
|
||||
DB::Memory read_buffer(SSD_BLOCK_SIZE * index_to_out.size(), BUFFER_ALIGNMENT);
|
||||
|
||||
std::vector<iocb> requests(index_to_out.size());
|
||||
memset(requests.data(), 0, requests.size() * sizeof(requests.front()));
|
||||
@ -448,8 +466,8 @@ void CachePartition::getValueFromStorage(
|
||||
#else
|
||||
requests[i].aio_lio_opcode = IOCB_CMD_PREAD;
|
||||
requests[i].aio_fildes = read_fd;
|
||||
requests[i].aio_buf = reinterpret_cast<UInt64>(read_buffer.data()) + i * MAX_ATTRIBUTES_SIZE;
|
||||
requests[i].aio_nbytes = MAX_ATTRIBUTES_SIZE;
|
||||
requests[i].aio_buf = reinterpret_cast<UInt64>(read_buffer.data()) + i * SSD_BLOCK_SIZE;
|
||||
requests[i].aio_nbytes = SSD_BLOCK_SIZE;
|
||||
requests[i].aio_offset = index_to_out[i].first;
|
||||
requests[i].aio_data = i;
|
||||
#endif
|
||||
@ -499,7 +517,7 @@ void CachePartition::getValueFromStorage(
|
||||
Poco::Logger::get("Read:").information("ito: f:" + std::to_string(index_to_out[event.data].first) + " s:" + std::to_string(index_to_out[event.data].second));
|
||||
Poco::Logger::get("Read:").information("data: " + std::to_string(event.data) + " res: " + std::to_string(event.res));
|
||||
|
||||
DB::ReadBufferFromMemory buf(read_buffer.data() + event.data * MAX_ATTRIBUTES_SIZE, event.res);
|
||||
DB::ReadBufferFromMemory buf(read_buffer.data() + event.data * SSD_BLOCK_SIZE, event.res);
|
||||
readValueFromBuffer(attribute_index, out[index_to_out[event.data].second], buf);
|
||||
}
|
||||
}
|
||||
|
@ -143,8 +143,6 @@ private:
|
||||
//int index_fd;
|
||||
mutable int read_fd = -1;
|
||||
|
||||
std::unique_ptr<WriteBufferAIO> write_data_buffer;
|
||||
|
||||
struct KeyMetadata final
|
||||
{
|
||||
using time_point_t = std::chrono::system_clock::time_point;
|
||||
|
Loading…
Reference in New Issue
Block a user