Merge branch 'master' into distinct_in_order_wo_order_by

This commit is contained in:
Igor Nikonov 2022-09-23 11:34:54 +02:00 committed by GitHub
commit 5aa92e470a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 226 additions and 8 deletions

View File

@ -7,11 +7,15 @@
#include <base/unaligned.h>
#include <Core/Field.h>
#include <Common/assert_cast.h>
#include <Common/TargetSpecific.h>
#include <Core/TypeId.h>
#include <base/TypeName.h>
#include "config_core.h"
#if USE_MULTITARGET_CODE
# include <immintrin.h>
#endif
namespace DB
{
@ -391,6 +395,124 @@ protected:
Container data;
};
DECLARE_DEFAULT_CODE(
template <typename Container, typename Type>
inline void vectorIndexImpl(const Container & data, const PaddedPODArray<Type> & indexes, size_t limit, Container & res_data)
{
for (size_t i = 0; i < limit; ++i)
res_data[i] = data[indexes[i]];
}
);
DECLARE_AVX512VBMI_SPECIFIC_CODE(
template <typename Container, typename Type>
inline void vectorIndexImpl(const Container & data, const PaddedPODArray<Type> & indexes, size_t limit, Container & res_data)
{
static constexpr UInt64 MASK64 = 0xffffffffffffffff;
const size_t limit64 = limit & ~63;
size_t pos = 0;
size_t data_size = data.size();
auto data_pos = reinterpret_cast<const UInt8 *>(data.data());
auto indexes_pos = reinterpret_cast<const UInt8 *>(indexes.data());
auto res_pos = reinterpret_cast<UInt8 *>(res_data.data());
if (data_size <= 64)
{
/// one single mask load for table size <= 64
__mmask64 last_mask = MASK64 >> (64 - data_size);
__m512i table1 = _mm512_maskz_loadu_epi8(last_mask, data_pos);
/// 64 bytes table lookup using one single permutexvar_epi8
while (pos < limit64)
{
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
__m512i out = _mm512_permutexvar_epi8(vidx, table1);
_mm512_storeu_epi8(res_pos + pos, out);
pos += 64;
}
/// tail handling
if (limit > limit64)
{
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
__m512i out = _mm512_permutexvar_epi8(vidx, table1);
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
}
}
else if (data_size <= 128)
{
/// table size (64, 128] requires 2 zmm load
__mmask64 last_mask = MASK64 >> (128 - data_size);
__m512i table1 = _mm512_loadu_epi8(data_pos);
__m512i table2 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 64);
/// 128 bytes table lookup using one single permute2xvar_epi8
while (pos < limit64)
{
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
__m512i out = _mm512_permutex2var_epi8(table1, vidx, table2);
_mm512_storeu_epi8(res_pos + pos, out);
pos += 64;
}
if (limit > limit64)
{
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
__m512i out = _mm512_permutex2var_epi8(table1, vidx, table2);
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
}
}
else
{
if (data_size > 256)
{
/// byte index will not exceed 256 boundary.
data_size = 256;
}
__m512i table1 = _mm512_loadu_epi8(data_pos);
__m512i table2 = _mm512_loadu_epi8(data_pos + 64);
__m512i table3, table4;
if (data_size <= 192)
{
/// only 3 tables need to load if size <= 192
__mmask64 last_mask = MASK64 >> (192 - data_size);
table3 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 128);
table4 = _mm512_setzero_si512();
}
else
{
__mmask64 last_mask = MASK64 >> (256 - data_size);
table3 = _mm512_loadu_epi8(data_pos + 128);
table4 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 192);
}
/// 256 bytes table lookup can use: 2 permute2xvar_epi8 plus 1 blender with MSB
while (pos < limit64)
{
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
__m512i tmp1 = _mm512_permutex2var_epi8(table1, vidx, table2);
__m512i tmp2 = _mm512_permutex2var_epi8(table3, vidx, table4);
__mmask64 msb = _mm512_movepi8_mask(vidx);
__m512i out = _mm512_mask_blend_epi8(msb, tmp1, tmp2);
_mm512_storeu_epi8(res_pos + pos, out);
pos += 64;
}
if (limit > limit64)
{
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
__m512i tmp1 = _mm512_permutex2var_epi8(table1, vidx, table2);
__m512i tmp2 = _mm512_permutex2var_epi8(table3, vidx, table4);
__mmask64 msb = _mm512_movepi8_mask(vidx);
__m512i out = _mm512_mask_blend_epi8(msb, tmp1, tmp2);
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
}
}
}
);
template <typename T>
template <typename Type>
ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
@ -399,8 +521,18 @@ ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_
auto res = this->create(limit);
typename Self::Container & res_data = res->getData();
for (size_t i = 0; i < limit; ++i)
res_data[i] = data[indexes[i]];
#if USE_MULTITARGET_CODE
if constexpr (sizeof(T) == 1 && sizeof(Type) == 1)
{
/// VBMI optimization only applicable for (U)Int8 types
if (isArchSupported(TargetArch::AVX512VBMI))
{
TargetSpecific::AVX512VBMI::vectorIndexImpl<Container, Type>(data, indexes, limit, res_data);
return res;
}
}
#endif
TargetSpecific::Default::vectorIndexImpl<Container, Type>(data, indexes, limit, res_data);
return res;
}

View File

@ -1,10 +1,10 @@
#include <limits>
#include <typeinfo>
#include <vector>
#include <Columns/ColumnsNumber.h>
#include <Common/randomSeed.h>
#include <gtest/gtest.h>
using namespace DB;
static pcg64 rng(randomSeed());
@ -76,7 +76,6 @@ static void testFilter()
}
}
TEST(ColumnVector, Filter)
{
testFilter<UInt8>();
@ -89,3 +88,71 @@ TEST(ColumnVector, Filter)
testFilter<Float64>();
testFilter<UUID>();
}
template <typename T>
static MutableColumnPtr createIndexColumn(size_t limit, size_t rows)
{
auto column = ColumnVector<T>::create();
auto & values = column->getData();
auto max = std::numeric_limits<T>::max();
limit = limit > max ? max : limit;
for (size_t i = 0; i < rows; ++i)
{
T val = rng() % limit;
values.push_back(val);
}
return column;
}
template <typename T, typename IndexType>
static void testIndex()
{
static const std::vector<size_t> column_sizes = {64, 128, 196, 256, 512};
auto test_case = [&](size_t rows, size_t index_rows, size_t limit)
{
auto vector_column = createColumn<T>(rows);
auto index_column = createIndexColumn<IndexType>(rows, index_rows);
auto res_column = vector_column->index(*index_column, limit);
if (limit == 0)
limit = index_column->size();
/// check results
if (limit != res_column->size())
throw Exception(error_code, "ColumnVector index size not match to limit: {} {}", typeid(T).name(), typeid(IndexType).name());
for (size_t i = 0; i < limit; ++i)
{
/// vector_column data is the same as index, so indexed column's value will equals to index_column.
if (res_column->get64(i) != index_column->get64(i))
throw Exception(error_code, "ColumnVector index fail: {} {}", typeid(T).name(), typeid(IndexType).name());
}
};
try
{
for (size_t i = 0; i < TEST_RUNS; ++i)
{
/// make sure rows distribute in (column_sizes[r-1], colulmn_sizes[r]]
size_t row_idx = rng() % column_sizes.size();
size_t row_base = row_idx > 0 ? column_sizes[row_idx - 1] : 0;
size_t rows = row_base + (rng() % (column_sizes[row_idx] - row_base) + 1);
size_t index_rows = rng() % MAX_ROWS + 1;
test_case(rows, index_rows, 0);
test_case(rows, index_rows, 0.5 * index_rows);
}
}
catch (const Exception & e)
{
FAIL() << e.displayText();
}
}
TEST(ColumnVector, Index)
{
testIndex<UInt8, UInt8>();
testIndex<UInt16, UInt8>();
testIndex<UInt16, UInt16>();
}

View File

@ -1,15 +1,20 @@
#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Common/filesystemHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DATABASE_ACCESS_DENIED;
}
namespace
{
const String DEFAULT_OUTPUT_FORMAT = "CSV";
@ -26,8 +31,19 @@ InputFormatErrorsLogger::InputFormatErrorsLogger(const ContextPtr & context)
database = context->getInsertionTable().getDatabaseName();
String path_in_setting = context->getSettingsRef().input_format_record_errors_file_path;
errors_file_path = context->getApplicationType() == Context::ApplicationType::SERVER ? context->getUserFilesPath() + path_in_setting
: path_in_setting;
if (context->getApplicationType() == Context::ApplicationType::SERVER)
{
auto user_files_path = context->getUserFilesPath();
errors_file_path = fs::path(user_files_path) / path_in_setting;
if (!fileOrSymlinkPathStartsWith(errors_file_path, user_files_path))
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Cannot log errors in path `{}`, because it is not inside `{}`", errors_file_path, user_files_path);
}
else
{
errors_file_path = path_in_setting;
}
while (fs::exists(errors_file_path))
{
errors_file_path += "_new";

View File

@ -0,0 +1,3 @@
insert into function file(02453_data.jsonl, TSV) select 1 settings engine_file_truncate_on_insert=1;
select * from file(02453_data.jsonl, auto, 'x UInt32') settings input_format_allow_errors_num=1, input_format_record_errors_file_path='../error_file'; -- {serverError DATABASE_ACCESS_DENIED}