fix other os

This commit is contained in:
Nikita Vasilev 2020-05-24 11:56:34 +03:00
parent 7358410a84
commit c70401b1e4
6 changed files with 81 additions and 14 deletions

View File

@ -1,3 +1,5 @@
#if defined(__linux__) || defined(__FreeBSD__)
#include "SSDCacheDictionary.h"
#include <algorithm>
@ -407,7 +409,7 @@ void SSDCachePartition::flush()
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
if (bytes_written != static_cast<decltype(bytes_written)>(write_request.aio_nbytes))
if (bytes_written != static_cast<decltype(bytes_written)>(block_size * write_buffer_size))
throw Exception("Not all data was written for asynchronous IO on file " + path + BIN_FILE_EXT + ". returned: " + std::to_string(bytes_written), ErrorCodes::AIO_WRITE_ERROR);
if (::fsync(fd) < 0)
@ -574,8 +576,14 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
blocks_to_indices.reserve(index_to_out.size());
for (size_t i = 0; i < index_to_out.size(); ++i)
{
#if defined(__FreeBSD__)
const auto back_offset = requests.back().aio.aio_offset;
#else
const auto back_offset = requests.back().aio_offset;
#endif
if (!requests.empty() &&
static_cast<size_t>(requests.back().aio_offset) == index_to_out[i].first.getBlockId() * block_size)
static_cast<size_t>(back_offset) == index_to_out[i].first.getBlockId() * block_size)
{
blocks_to_indices.back().push_back(i);
continue;
@ -586,9 +594,9 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
request.aio.aio_lio_opcode = LIO_READ;
request.aio.aio_fildes = fd;
request.aio.aio_buf = reinterpret_cast<volatile void *>(
reinterpret_cast<UInt64>(read_buffer.data()) + SSD_BLOCK_SIZE * (requests.size() % READ_BUFFER_SIZE_BLOCKS));
request.aio.aio_nbytes = SSD_BLOCK_SIZE;
request.aio.aio_offset = index_to_out[i].first;
reinterpret_cast<UInt64>(read_buffer.data()) + block_size * (requests.size() % read_buffer_size));
request.aio.aio_nbytes = block_size;
request.aio.aio_offset = index_to_out[i].first.getBlockId() * block_size;
request.aio_data = requests.size();
#else
request.aio_lio_opcode = IOCB_CMD_PREAD;
@ -608,8 +616,13 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
std::vector<bool> processed(requests.size(), false);
std::vector<io_event> events(requests.size());
#if defined(__FreeBSD__)
for (auto & event : events)
event.udata = -1;
#else
for (auto & event : events)
event.res = -1;
#endif
size_t to_push = 0;
size_t to_pop = 0;
@ -626,18 +639,34 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
{
const auto request_id = events[i].data;
const auto & request = requests[request_id];
if (events[i].res != static_cast<ssize_t>(request.aio_nbytes))
#if defined(__FreeBSD__)
const auto bytes_written = aio_return(reinterpret_cast<struct aiocb *>(events[i].udata));
#else
const auto bytes_written = events[i].res;
#endif
if (bytes_written != static_cast<ssize_t>(block_size))
{
throw Exception("AIO failed to read file " + path + BIN_FILE_EXT + ". " +
"request_id= " + std::to_string(request.aio_data) + "/ " + std::to_string(requests.size()) +
", aio_nbytes=" + std::to_string(request.aio_nbytes) + ", aio_offset=" + std::to_string(request.aio_offset) +
", returned=" + std::to_string(events[i].res) + ", errno=" + std::to_string(errno), ErrorCodes::AIO_READ_ERROR);
#if defined(__FreeBSD__)
throw Exception("AIO failed to read file " + path + BIN_FILE_EXT + ".");
#else
throw Exception("AIO failed to read file " + path + BIN_FILE_EXT + ". " +
"request_id= " + std::to_string(request.aio_data) + "/ " + std::to_string(requests.size()) +
", aio_nbytes=" + std::to_string(request.aio_nbytes) + ", aio_offset=" + std::to_string(request.aio_offset) +
", returned=" + std::to_string(events[i].res) + ", errno=" + std::to_string(errno), ErrorCodes::AIO_READ_ERROR);
#endif
}
__msan_unpoison(reinterpret_cast<char *>(request.aio_buf), request.aio_nbytes);
#if defined(__FreeBSD__)
const auto* buf_ptr = reinterpret_cast<char *>(request.aio.aio_buf);
#else
const auto* buf_ptr = reinterpret_cast<char *>(request.aio_buf);
#endif
__msan_unpoison(buf_ptr, block_size);
uint64_t checksum = 0;
ReadBufferFromMemory buf_special(reinterpret_cast<char *>(request.aio_buf), block_size);
ReadBufferFromMemory buf_special(buf_ptr, block_size);
readBinary(checksum, buf_special);
uint64_t calculated_checksum = CityHash_v1_0_2::CityHash64(reinterpret_cast<char *>(request.aio_buf) + BLOCK_CHECKSUM_SIZE, block_size - BLOCK_CHECKSUM_SIZE);
uint64_t calculated_checksum = CityHash_v1_0_2::CityHash64(buf_ptr + BLOCK_CHECKSUM_SIZE, block_size - BLOCK_CHECKSUM_SIZE);
if (checksum != calculated_checksum)
{
throw Exception("Cache data corrupted. From block = " + std::to_string(checksum) + " calculated = " + std::to_string(calculated_checksum) + ".", ErrorCodes::CORRUPTED_DATA);
@ -647,7 +676,7 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
{
const auto & [file_index, out_index] = index_to_out[idx];
ReadBufferFromMemory buf(
reinterpret_cast<char *>(request.aio_buf) + file_index.getAddressInBlock(),
buf_ptr + file_index.getAddressInBlock(),
block_size - file_index.getAddressInBlock());
set(out_index, buf);
}
@ -1667,3 +1696,5 @@ void registerDictionarySSDCache(DictionaryFactory & factory)
}
}
#endif

View File

@ -1,5 +1,7 @@
#pragma once
#if defined(__linux__) || defined(__FreeBSD__)
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
@ -454,3 +456,5 @@ private:
};
}
#endif

View File

@ -1,3 +1,5 @@
#if defined(__linux__) || defined(__FreeBSD__)
#include "SSDComplexKeyCacheDictionary.h"
#include <algorithm>
@ -1770,3 +1772,5 @@ void registerDictionarySSDComplexKeyCache(DictionaryFactory & factory)
}
}
#endif

View File

@ -1,5 +1,7 @@
#pragma once
#if defined(__linux__) || defined(__FreeBSD__)
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
@ -685,3 +687,5 @@ private:
};
}
#endif

View File

@ -25,8 +25,10 @@ void registerDictionaryTrie(DictionaryFactory & factory);
void registerDictionaryFlat(DictionaryFactory & factory);
void registerDictionaryHashed(DictionaryFactory & factory);
void registerDictionaryCache(DictionaryFactory & factory);
#if defined(__linux__) || defined(__FreeBSD__)
void registerDictionarySSDCache(DictionaryFactory & factory);
void registerDictionarySSDComplexKeyCache(DictionaryFactory & factory);
#endif
void registerDictionaryPolygon(DictionaryFactory & factory);
void registerDictionaryDirect(DictionaryFactory & factory);

View File

@ -29,8 +29,10 @@
#include <Dictionaries/FlatDictionary.h>
#include <Dictionaries/HashedDictionary.h>
#include <Dictionaries/CacheDictionary.h>
#if defined(__linux__) || defined(__FreeBSD__)
#include <Dictionaries/SSDCacheDictionary.h>
#include <Dictionaries/SSDComplexKeyCacheDictionary.h>
#endif
#include <Dictionaries/ComplexKeyHashedDictionary.h>
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include <Dictionaries/ComplexKeyDirectDictionary.h>
@ -175,10 +177,14 @@ private:
if (!executeDispatchSimple<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatchSimple<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatchSimple<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchSimple<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
@ -327,10 +333,14 @@ private:
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
@ -506,10 +516,14 @@ private:
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
@ -841,10 +855,14 @@ private:
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
@ -1097,10 +1115,14 @@ private:
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&