diff --git a/dbms/src/Functions/CMakeLists.txt b/dbms/src/Functions/CMakeLists.txt index d86e43cb435..347416a7a86 100644 --- a/dbms/src/Functions/CMakeLists.txt +++ b/dbms/src/Functions/CMakeLists.txt @@ -100,3 +100,5 @@ endif () if (ENABLE_TESTS) add_subdirectory (tests) endif () + +target_link_libraries (clickhouse_functions PRIVATE yandex-consistent-hashing) diff --git a/dbms/src/Functions/FunctionsConsistentHashing.cpp b/dbms/src/Functions/FunctionsConsistentHashing.cpp new file mode 100644 index 00000000000..abf789c6073 --- /dev/null +++ b/dbms/src/Functions/FunctionsConsistentHashing.cpp @@ -0,0 +1,14 @@ +#include "FunctionsConsistentHashing.h" +#include + + +namespace DB +{ + +void registerFunctionsConsistentHashing(FunctionFactory & factory) +{ + factory.registerFunction(); + factory.registerFunction(); +} + +} diff --git a/dbms/src/Functions/FunctionsConsistentHashing.h b/dbms/src/Functions/FunctionsConsistentHashing.h new file mode 100644 index 00000000000..8678281cbeb --- /dev/null +++ b/dbms/src/Functions/FunctionsConsistentHashing.h @@ -0,0 +1,173 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_COLUMN; + extern const int BAD_ARGUMENTS; +} + + +struct YandexConsistentHashImpl +{ + static constexpr auto name = "YandexConsistentHash"; + + /// Actually it supports UInt64, but it is effective only if n < 65536 + using ResultType = UInt32; + using BucketsCountType = ResultType; + + static inline ResultType apply(UInt64 hash, BucketsCountType n) + { + return ConsistentHashing(hash, n); + } +}; + + +/// Code from https://arxiv.org/pdf/1406.2294.pdf +static inline int32_t JumpConsistentHash(uint64_t key, int32_t num_buckets) { + int64_t b = -1, j = 0; + while (j < num_buckets) { + b = j; + key = key * 2862933555777941757ULL + 1; + j = static_cast((b + 1) * (double(1LL << 31) / double((key >> 33) + 1))); + } + return static_cast(b); +} + +struct JumpConsistentHashImpl +{ + static constexpr auto name = "JumpConsistentHash"; + + using ResultType = Int32; + using BucketsCountType = ResultType; + + static inline ResultType apply(UInt64 hash, BucketsCountType n) + { + return JumpConsistentHash(hash, n); + } +}; + + +template +class FunctionConsistentHashImpl : public IFunction +{ +public: + + static constexpr auto name = Impl::name; + using ResultType = typename Impl::ResultType; + using BucketsType = typename Impl::BucketsCountType; + + static FunctionPtr create(const Context &) { return std::make_shared>(); }; + + String getName() const override { return name; } + + size_t getNumberOfArguments() const override { return 2; } + + DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override + { + if (!arguments[0]->isInteger()) + throw Exception("Illegal type " + arguments[0]->getName() + " of the first argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + if (!arguments[1]->isInteger()) + throw Exception("Illegal type " + arguments[1]->getName() + " of the second argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return std::make_shared>(); + } + + void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override + { + auto buckets_col = block.getByPosition(arguments[1]).column.get(); + if (!buckets_col->isColumnConst()) + throw Exception("The second argument of function " + getName() + " (number of buckets) must be constant", ErrorCodes::BAD_ARGUMENTS); + + constexpr UInt64 max_buckets = static_cast(std::numeric_limits::max()); + UInt64 num_buckets; + + auto check_range = [&] (auto buckets) + { + if (buckets <= 0) + throw Exception("The second argument of function " + getName() + " (number of buckets) must be positive number", + ErrorCodes::BAD_ARGUMENTS); + + if (static_cast(buckets) > max_buckets) + throw Exception("The value of the second argument of function " + getName() + " (number of buckets) is not fit to " + + DataTypeNumber().getName(), ErrorCodes::BAD_ARGUMENTS); + + num_buckets = static_cast(buckets); + }; + + Field buckets_field = (*buckets_col)[0]; + if (buckets_field.getType() == Field::Types::Int64) + check_range(buckets_field.safeGet()); + else if (buckets_field.getType() == Field::Types::UInt64) + check_range(buckets_field.safeGet()); + else + throw Exception("Illegal type " + String(buckets_field.getTypeName()) + " of the second argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + + const auto & hash_col_source = block.getByPosition(arguments[0]).column; + ColumnPtr hash_col = (hash_col_source->isColumnConst()) ? hash_col_source->convertToFullColumnIfConst() : hash_col_source; + ColumnPtr & res_col = block.getByPosition(result).column; + + + const IDataType * hash_type = block.getByPosition(arguments[0]).type.get(); + + if (checkDataType(hash_type)) executeType(hash_col, res_col, num_buckets); + else if (checkDataType(hash_type)) executeType(hash_col, res_col, num_buckets); + else if (checkDataType(hash_type)) executeType(hash_col, res_col, num_buckets); + else if (checkDataType(hash_type)) executeType(hash_col, res_col, num_buckets); + else if (checkDataType(hash_type)) executeType(hash_col, res_col, num_buckets); + else if (checkDataType(hash_type)) executeType(hash_col, res_col, num_buckets); + else if (checkDataType(hash_type)) executeType(hash_col, res_col, num_buckets); + else if (checkDataType(hash_type)) executeType(hash_col, res_col, num_buckets); + else + throw Exception("Illegal type " + hash_type->getName() + " of the first argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + +private: + + template + void executeType(const ColumnPtr & col_hash_ptr, ColumnPtr & out_col_result, const UInt64 num_buckets) + { + auto col_hash = checkAndGetColumn>(col_hash_ptr.get()); + if (!col_hash) + throw Exception("Illegal type of the first argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + auto col_result = ColumnVector::create(); + typename ColumnVector::Container & vec_result = col_result->getData(); + const auto & vec_hash = col_hash->getData(); + + size_t size = vec_hash.size(); + vec_result.resize(size); + for (size_t i = 0; i < size; ++i) + vec_result[i] = Impl::apply(static_cast(vec_hash[i]), static_cast(num_buckets)); + + out_col_result = std::move(col_result); + } +}; + + +using FunctionYandexConsistentHash = FunctionConsistentHashImpl; +using FunctionJumpConsistentHas = FunctionConsistentHashImpl; + + +} diff --git a/dbms/src/Functions/registerFunctions.cpp b/dbms/src/Functions/registerFunctions.cpp index 365b4a730bf..0dcc66bfd77 100644 --- a/dbms/src/Functions/registerFunctions.cpp +++ b/dbms/src/Functions/registerFunctions.cpp @@ -24,6 +24,7 @@ void registerFunctionsExternalDictionaries(FunctionFactory &); void registerFunctionsExternalModels(FunctionFactory &); void registerFunctionsFormatting(FunctionFactory &); void registerFunctionsHashing(FunctionFactory &); +void registerFunctionsConsistentHashing(FunctionFactory &); void registerFunctionsHigherOrder(FunctionFactory &); void registerFunctionsLogical(FunctionFactory &); void registerFunctionsMiscellaneous(FunctionFactory &); @@ -60,6 +61,7 @@ void registerFunctions() registerFunctionsExternalModels(factory); registerFunctionsFormatting(factory); registerFunctionsHashing(factory); + registerFunctionsConsistentHashing(factory); registerFunctionsHigherOrder(factory); registerFunctionsLogical(factory); registerFunctionsMiscellaneous(factory); diff --git a/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.reference b/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.reference new file mode 100644 index 00000000000..64458288805 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.reference @@ -0,0 +1,6 @@ +0 43 520 0 361 237 +0 1 1 3 111 173 +358 +341 +111 +111 diff --git a/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.sql b/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.sql new file mode 100644 index 00000000000..1a2303d3072 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.sql @@ -0,0 +1,4 @@ +SELECT JumpConsistentHash(1, 1), JumpConsistentHash(42, 57), JumpConsistentHash(256, 1024), JumpConsistentHash(3735883980, 1), JumpConsistentHash(3735883980, 666), JumpConsistentHash(16045690984833335023, 255); +SELECT YandexConsistentHash(16045690984833335023, 1), YandexConsistentHash(16045690984833335023, 2), YandexConsistentHash(16045690984833335023, 3), YandexConsistentHash(16045690984833335023, 4), YandexConsistentHash(16045690984833335023, 173), YandexConsistentHash(16045690984833335023, 255); +SELECT JumpConsistentHash(intHash64(number), 787) FROM system.numbers LIMIT 1000000, 2; +SELECT YandexConsistentHash(16045690984833335023+number-number, 120) FROM system.numbers LIMIT 1000000, 2; diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 970d2be15b4..cf2e8464452 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -18,3 +18,5 @@ endif () if (USE_MYSQL) add_subdirectory (libmysqlxx) endif () + +add_subdirectory (yandex-consistent-hashing) diff --git a/libs/yandex-consistent-hashing/CMakeLists.txt b/libs/yandex-consistent-hashing/CMakeLists.txt new file mode 100644 index 00000000000..694c2d071d9 --- /dev/null +++ b/libs/yandex-consistent-hashing/CMakeLists.txt @@ -0,0 +1,5 @@ +cmake_minimum_required(VERSION 2.8) +project(yandex-consistent-hashing CXX) + +add_library(yandex-consistent-hashing yandex/consistent_hashing.cpp yandex/popcount.cpp) +target_include_directories(yandex-consistent-hashing PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) \ No newline at end of file diff --git a/libs/yandex-consistent-hashing/yandex/bitops.h b/libs/yandex-consistent-hashing/yandex/bitops.h new file mode 100644 index 00000000000..0ddb7f8024b --- /dev/null +++ b/libs/yandex-consistent-hashing/yandex/bitops.h @@ -0,0 +1,55 @@ +#pragma once +#include +#include +#include + +// Assume little endian + +inline uint16_t & LO_16(uint32_t & x) { return reinterpret_cast(&x)[0]; } +inline uint16_t & HI_16(uint32_t & x) { return reinterpret_cast(&x)[1]; } + +inline uint32_t & LO_32(uint64_t & x) { return reinterpret_cast(&x)[0]; } +inline uint32_t & HI_32(uint64_t & x) { return reinterpret_cast(&x)[1]; } + + +#if defined(__GNUC__) + inline unsigned GetValueBitCountImpl(unsigned int value) noexcept { + // Y_ASSERT(value); // because __builtin_clz* have undefined result for zero. + return std::numeric_limits::digits - __builtin_clz(value); + } + + inline unsigned GetValueBitCountImpl(unsigned long value) noexcept { + // Y_ASSERT(value); // because __builtin_clz* have undefined result for zero. + return std::numeric_limits::digits - __builtin_clzl(value); + } + + inline unsigned GetValueBitCountImpl(unsigned long long value) noexcept { + // Y_ASSERT(value); // because __builtin_clz* have undefined result for zero. + return std::numeric_limits::digits - __builtin_clzll(value); + } +#else + /// Stupid realization for non-GCC. Can use BSR from x86 instructions set. + template + inline unsigned GetValueBitCountImpl(T value) noexcept { + // Y_ASSERT(value); // because __builtin_clz* have undefined result for zero. + unsigned result = 1; // result == 0 - impossible value, see Y_ASSERT(). + value >>= 1; + while (value) { + value >>= 1; + ++result; + } + + return result; + } +#endif + + +/** + * Returns the number of leading 0-bits in `value`, starting at the most significant bit position. + */ +template +static inline unsigned GetValueBitCount(T value) noexcept { + // Y_ASSERT(value > 0); + using TCvt = std::make_unsigned_t>; + return GetValueBitCountImpl(static_cast(value)); +} diff --git a/libs/yandex-consistent-hashing/yandex/consistent_hashing.cpp b/libs/yandex-consistent-hashing/yandex/consistent_hashing.cpp new file mode 100644 index 00000000000..347456eede3 --- /dev/null +++ b/libs/yandex-consistent-hashing/yandex/consistent_hashing.cpp @@ -0,0 +1,125 @@ +#include "consistent_hashing.h" + +#include "bitops.h" + +#include "popcount.h" + +#include + +/* + * (all numbers are written in big-endian manner: the least significant digit on the right) + * (only bit representations are used - no hex or octal, leading zeroes are ommited) + * + * Consistent hashing scheme: + * + * (sizeof(TValue) * 8, y] (y, 0] + * a = * ablock + * b = * cblock + * + * (sizeof(TValue) * 8, k] (k, 0] + * c = * cblock + * + * d = * + * + * k - is determined by 2^(k-1) < n <= 2^k inequality + * z - is number of ones in cblock + * y - number of digits after first one in cblock + * + * The cblock determines logic of using a- and b- blocks: + * + * bits of cblock | result of a function + * 0 : 0 + * 1 : 1 (optimization, the next case includes this one) + * 1?..? : 1ablock (z is even) or 1bblock (z is odd) if possible (=n), than smooth moving from n=2^(k-1) to n=2^k is applied. + * Using "*" bits of a-,b-,c-,d- blocks uint64_t value is combined, modulo of which determines + * if the value should be greather than 2^(k-1) or ConsistentHashing(x, 2^(k-1)) should be used. + * The last case is optimized according to previous checks. + */ + +namespace { + +template +TValue PowerOf2(size_t k) { + return (TValue)0x1 << k; +} + +template +TValue SelectAOrBBlock(TValue a, TValue b, TValue cBlock) { + size_t z = PopCount(cBlock); + bool useABlock = z % 2 == 0; + return useABlock ? a : b; +} + +// Gets the exact result for n = k2 = 2 ^ k +template +size_t ConsistentHashingForPowersOf2(TValue a, TValue b, TValue c, TValue k2) { + TValue cBlock = c & (k2 - 1); // (k, 0] bits of c + // Zero and one cases + if (cBlock < 2) { + // First two cases of result function table: 0 if cblock is 0, 1 if cblock is 1. + return cBlock; + } + size_t y = GetValueBitCount(cBlock) - 1; // cblock = 0..01?..? (y = number of digits after 1), y > 0 + TValue y2 = PowerOf2(y); // y2 = 2^y + TValue abBlock = SelectAOrBBlock(a, b, cBlock) & (y2 - 1); + return y2 + abBlock; +} + +template +uint64_t GetAsteriskBits(TValue a, TValue b, TValue c, TValue d, size_t k) { + size_t shift = sizeof(TValue) * 8 - k; + uint64_t res = (d << shift) | (c >> k); + ++shift; + res <<= shift; + res |= b >> (k - 1); + res <<= shift; + res |= a >> (k - 1); + + return res; +} + +template +size_t ConsistentHashingImpl(TValue a, TValue b, TValue c, TValue d, size_t n) { + if (n <= 0) + throw std::runtime_error("Can't map consistently to a zero values."); + + // Uninteresting case + if (n == 1) { + return 0; + } + size_t k = GetValueBitCount(n - 1); // 2^(k-1) < n <= 2^k, k >= 1 + TValue k2 = PowerOf2(k); // k2 = 2^k + size_t largeValue; + { + // Bit determined variant. Large scheme. + largeValue = ConsistentHashingForPowersOf2(a, b, c, k2); + if (largeValue < n) { + return largeValue; + } + } + // Since largeValue is not assigned yet + // Smooth moving from one bit scheme to another + TValue k21 = PowerOf2(k - 1); + { + size_t s = GetAsteriskBits(a, b, c, d, k) % (largeValue * (largeValue + 1)); + size_t largeValue2 = s / k2 + k21; + if (largeValue2 < n) { + return largeValue2; + } + } + // Bit determined variant. Short scheme. + return ConsistentHashingForPowersOf2(a, b, c, k21); // Do not apply checks. It is always less than k21 = 2^(k-1) +} + +} // namespace // anonymous + +std::size_t ConsistentHashing(std::uint64_t x, std::size_t n) { + uint32_t lo = LO_32(x); + uint32_t hi = HI_32(x); + return ConsistentHashingImpl(LO_16(lo), HI_16(lo), LO_16(hi), HI_16(hi), n); +} +std::size_t ConsistentHashing(std::uint64_t lo, std::uint64_t hi, std::size_t n) { + return ConsistentHashingImpl(LO_32(lo), HI_32(lo), LO_32(hi), HI_32(hi), n); +} diff --git a/libs/yandex-consistent-hashing/yandex/consistent_hashing.h b/libs/yandex-consistent-hashing/yandex/consistent_hashing.h new file mode 100644 index 00000000000..0ac2b01fcfb --- /dev/null +++ b/libs/yandex-consistent-hashing/yandex/consistent_hashing.h @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +/* + * Maps random ui64 x (in fact hash of some string) to n baskets/shards. + * Output value is id of a basket. 0 <= ConsistentHashing(x, n) < n. + * Probability of all baskets must be equal. Also, it should be consistent + * in terms, that with different n_1 < n_2 probability of + * ConsistentHashing(x, n_1) != ConsistentHashing(x, n_2) must be equal to + * (n_2 - n_1) / n_2 - the least possible with previous conditions. + * It requires O(1) memory and cpu to calculate. So, it is faster than classic + * consistent hashing algos with points on circle. + */ +std::size_t ConsistentHashing(std::uint64_t x, std::size_t n); // Works good for n < 65536 +std::size_t ConsistentHashing(std::uint64_t lo, std::uint64_t hi, std::size_t n); // Works good for n < 4294967296 diff --git a/libs/yandex-consistent-hashing/yandex/popcount.cpp b/libs/yandex-consistent-hashing/yandex/popcount.cpp new file mode 100644 index 00000000000..66edfe65829 --- /dev/null +++ b/libs/yandex-consistent-hashing/yandex/popcount.cpp @@ -0,0 +1,25 @@ +#include "popcount.h" + +static const uint8_t PopCountLUT8Impl[1 << 8] = { +#define B2(n) n, n + 1, n + 1, n + 2 +#define B4(n) B2(n), B2(n + 1), B2(n + 1), B2(n + 2) +#define B6(n) B4(n), B4(n + 1), B4(n + 1), B4(n + 2) + B6(0), B6(1), B6(1), B6(2)}; + +uint8_t const* PopCountLUT8 = PopCountLUT8Impl; + +#if !defined(_MSC_VER) +//ICE here for msvc + +static const uint8_t PopCountLUT16Impl[1 << 16] = { +#define B2(n) n, n + 1, n + 1, n + 2 +#define B4(n) B2(n), B2(n + 1), B2(n + 1), B2(n + 2) +#define B6(n) B4(n), B4(n + 1), B4(n + 1), B4(n + 2) +#define B8(n) B6(n), B6(n + 1), B6(n + 1), B6(n + 2) +#define B10(n) B8(n), B8(n + 1), B8(n + 1), B8(n + 2) +#define B12(n) B10(n), B10(n + 1), B10(n + 1), B10(n + 2) +#define B14(n) B12(n), B12(n + 1), B12(n + 1), B12(n + 2) + B14(0), B14(1), B14(1), B14(2)}; + +uint8_t const* PopCountLUT16 = PopCountLUT16Impl; +#endif diff --git a/libs/yandex-consistent-hashing/yandex/popcount.h b/libs/yandex-consistent-hashing/yandex/popcount.h new file mode 100644 index 00000000000..b49b2fb450a --- /dev/null +++ b/libs/yandex-consistent-hashing/yandex/popcount.h @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include +using std::size_t; + +#include "bitops.h" + +#if defined(_MSC_VER) +#include +#endif + +#ifdef __SSE2__ +constexpr bool HavePOPCNTInstr = true; +#else +constexpr bool HavePOPCNTInstr = false; +#pragma GCC warning "SSE2 is not detected, PopCount function will be too slow" +#endif + +static inline uint32_t PopCountImpl(uint8_t n) { + extern uint8_t const* PopCountLUT8; + return PopCountLUT8[n]; +} + +static inline uint32_t PopCountImpl(uint16_t n) { +#if defined(_MSC_VER) + return __popcnt16(n); +#else + extern uint8_t const* PopCountLUT16; + return PopCountLUT16[n]; +#endif +} + +static inline uint32_t PopCountImpl(uint32_t n) { +#if defined(_MSC_VER) + return __popcnt(n); +#else +#if defined(__x86_64__) + + if (HavePOPCNTInstr) { + uint32_t r; + + __asm__("popcnt %1, %0;" + : "=r"(r) + : "r"(n) + :); + + return r; + } +#endif + + return PopCountImpl((uint16_t)LO_16(n)) + PopCountImpl((uint16_t)HI_16(n)); +#endif +} + +static inline uint32_t PopCountImpl(uint64_t n) { +#if defined(_MSC_VER) && !defined(_i386_) + return __popcnt64(n); +#else +#if defined(__x86_64__) + + if (HavePOPCNTInstr) { + uint64_t r; + + __asm__("popcnt %1, %0;" + : "=r"(r) + : "r"(n) + :); + + return r; + } +#endif + + return PopCountImpl((uint32_t)LO_32(n)) + PopCountImpl((uint32_t)HI_32(n)); +#endif +} + +template +static inline uint32_t PopCount(T n) { + using TCvt = std::make_unsigned_t>; + + return PopCountImpl(static_cast(n)); +}