diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index e478c591d16..67351d9696d 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -1,7 +1,6 @@ #include "ClusterCopier.h" #include - #include #include #include @@ -13,14 +12,11 @@ #include #include #include - #include #include - #include #include #include - #include #include #include @@ -61,6 +57,7 @@ #include #include #include +#include #include #include #include @@ -897,6 +894,28 @@ public: } } + void uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force) + { + auto local_task_description_path = task_path + "/description"; + + String task_config_str; + { + ReadBufferFromFile in(task_file); + readStringUntilEOF(task_config_str, in); + } + if (task_config_str.empty()) + return; + + auto zookeeper = context.getZooKeeper(); + + zookeeper->createAncestors(local_task_description_path); + auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent); + if (code && force) + zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent); + + LOG_DEBUG(log, "Task description " << ((code && !force) ? "not " : "") << "uploaded to " << local_task_description_path << " with result " << code << " ("<< zookeeper->error2string(code) << ")"); + } + void reloadTaskDescription() { auto zookeeper = context.getZooKeeper(); @@ -2107,6 +2126,10 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options) options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper") .argument("task-path").binding("task-path")); + options.addOption(Poco::Util::Option("task-file", "", "path to task file for uploading in ZooKeeper to task-path") + .argument("task-file").binding("task-file")); + options.addOption(Poco::Util::Option("task-upload-force", "", "Force upload task-file even node already exists") + .argument("task-upload-force").binding("task-upload-force")); options.addOption(Poco::Util::Option("safe-mode", "", "disables ALTER DROP PARTITION in case of errors") .binding("safe-mode")); options.addOption(Poco::Util::Option("copy-fault-probability", "", "the copying fails with specified probability (used to test partition state recovering)") @@ -2157,6 +2180,11 @@ void ClusterCopierApp::mainImpl() auto copier = std::make_unique(task_path, host_id, default_database, *context); copier->setSafeMode(is_safe_mode); copier->setCopyFaultProbability(copy_fault_probability); + + auto task_file = config().getString("task-file", ""); + if (!task_file.empty()) + copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false)); + copier->init(); copier->process(); } diff --git a/dbms/src/AggregateFunctions/AggregateFunctionLeastSqr.cpp b/dbms/src/AggregateFunctions/AggregateFunctionLeastSqr.cpp new file mode 100644 index 00000000000..18474a7a7d4 --- /dev/null +++ b/dbms/src/AggregateFunctions/AggregateFunctionLeastSqr.cpp @@ -0,0 +1,85 @@ +#include + +#include +#include + + +namespace DB +{ + +namespace +{ + +AggregateFunctionPtr createAggregateFunctionLeastSqr( + const String & name, + const DataTypes & arguments, + const Array & params +) +{ + assertNoParameters(name, params); + assertBinary(name, arguments); + + const IDataType * x_arg = arguments.front().get(); + + WhichDataType which_x { + x_arg + }; + + const IDataType * y_arg = arguments.back().get(); + + WhichDataType which_y { + y_arg + }; + + #define FOR_LEASTSQR_TYPES_2(M, T) \ + M(T, UInt8) \ + M(T, UInt16) \ + M(T, UInt32) \ + M(T, UInt64) \ + M(T, Int8) \ + M(T, Int16) \ + M(T, Int32) \ + M(T, Int64) \ + M(T, Float32) \ + M(T, Float64) + #define FOR_LEASTSQR_TYPES(M) \ + FOR_LEASTSQR_TYPES_2(M, UInt8) \ + FOR_LEASTSQR_TYPES_2(M, UInt16) \ + FOR_LEASTSQR_TYPES_2(M, UInt32) \ + FOR_LEASTSQR_TYPES_2(M, UInt64) \ + FOR_LEASTSQR_TYPES_2(M, Int8) \ + FOR_LEASTSQR_TYPES_2(M, Int16) \ + FOR_LEASTSQR_TYPES_2(M, Int32) \ + FOR_LEASTSQR_TYPES_2(M, Int64) \ + FOR_LEASTSQR_TYPES_2(M, Float32) \ + FOR_LEASTSQR_TYPES_2(M, Float64) + #define DISPATCH(T1, T2) \ + if (which_x.idx == TypeIndex::T1 && which_y.idx == TypeIndex::T2) \ + return std::make_shared>( \ + arguments, \ + params \ + ); + + FOR_LEASTSQR_TYPES(DISPATCH) + + #undef FOR_LEASTSQR_TYPES_2 + #undef FOR_LEASTSQR_TYPES + #undef DISPATCH + + throw Exception( + "Illegal types (" + + x_arg->getName() + ", " + y_arg->getName() + + ") of arguments of aggregate function " + name + + ", must be Native Ints, Native UInts or Floats", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + ); +} + +} + +void registerAggregateFunctionLeastSqr(AggregateFunctionFactory & factory) +{ + factory.registerFunction("leastSqr", createAggregateFunctionLeastSqr); +} + +} diff --git a/dbms/src/AggregateFunctions/AggregateFunctionLeastSqr.h b/dbms/src/AggregateFunctions/AggregateFunctionLeastSqr.h new file mode 100644 index 00000000000..fd0b65c051f --- /dev/null +++ b/dbms/src/AggregateFunctions/AggregateFunctionLeastSqr.h @@ -0,0 +1,195 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + +template +struct AggregateFunctionLeastSqrData final +{ + size_t count = 0; + Ret sum_x = 0; + Ret sum_y = 0; + Ret sum_xx = 0; + Ret sum_xy = 0; + + void add(X x, Y y) + { + count += 1; + sum_x += x; + sum_y += y; + sum_xx += x * x; + sum_xy += x * y; + } + + void merge(const AggregateFunctionLeastSqrData & other) + { + count += other.count; + sum_x += other.sum_x; + sum_y += other.sum_y; + sum_xx += other.sum_xx; + sum_xy += other.sum_xy; + } + + void serialize(WriteBuffer & buf) const + { + writeBinary(count, buf); + writeBinary(sum_x, buf); + writeBinary(sum_y, buf); + writeBinary(sum_xx, buf); + writeBinary(sum_xy, buf); + } + + void deserialize(ReadBuffer & buf) + { + readBinary(count, buf); + readBinary(sum_x, buf); + readBinary(sum_y, buf); + readBinary(sum_xx, buf); + readBinary(sum_xy, buf); + } + + Ret getK() const + { + Ret divisor = sum_xx * count - sum_x * sum_x; + + if (divisor == 0) + return std::numeric_limits::quiet_NaN(); + + return (sum_xy * count - sum_x * sum_y) / divisor; + } + + Ret getB(Ret k) const + { + if (count == 0) + return std::numeric_limits::quiet_NaN(); + + return (sum_y - k * sum_x) / count; + } +}; + +/// Calculates simple linear regression parameters. +/// Result is a tuple (k, b) for y = k * x + b equation, solved by least squares approximation. +template +class AggregateFunctionLeastSqr final : public IAggregateFunctionDataHelper< + AggregateFunctionLeastSqrData, + AggregateFunctionLeastSqr +> +{ +public: + AggregateFunctionLeastSqr( + const DataTypes & arguments, + const Array & params + ): + IAggregateFunctionDataHelper< + AggregateFunctionLeastSqrData, + AggregateFunctionLeastSqr + > {arguments, params} + { + // notice: arguments has been checked before + } + + String getName() const override + { + return "leastSqr"; + } + + const char * getHeaderFilePath() const override + { + return __FILE__; + } + + void add( + AggregateDataPtr place, + const IColumn ** columns, + size_t row_num, + Arena * + ) const override + { + auto col_x { + static_cast *>(columns[0]) + }; + auto col_y { + static_cast *>(columns[1]) + }; + + X x = col_x->getData()[row_num]; + Y y = col_y->getData()[row_num]; + + this->data(place).add(x, y); + } + + void merge( + AggregateDataPtr place, + ConstAggregateDataPtr rhs, Arena * + ) const override + { + this->data(place).merge(this->data(rhs)); + } + + void serialize( + ConstAggregateDataPtr place, + WriteBuffer & buf + ) const override + { + this->data(place).serialize(buf); + } + + void deserialize( + AggregateDataPtr place, + ReadBuffer & buf, Arena * + ) const override + { + this->data(place).deserialize(buf); + } + + DataTypePtr getReturnType() const override + { + DataTypes types { + std::make_shared>(), + std::make_shared>(), + }; + + Strings names { + "k", + "b", + }; + + return std::make_shared( + std::move(types), + std::move(names) + ); + } + + void insertResultInto( + ConstAggregateDataPtr place, + IColumn & to + ) const override + { + Ret k = this->data(place).getK(); + Ret b = this->data(place).getB(k); + + auto & col_tuple = static_cast(to); + auto & col_k = static_cast &>(col_tuple.getColumn(0)); + auto & col_b = static_cast &>(col_tuple.getColumn(1)); + + col_k.getData().push_back(k); + col_b.getData().push_back(b); + } +}; + +} diff --git a/dbms/src/AggregateFunctions/AggregateFunctionUniq.h b/dbms/src/AggregateFunctions/AggregateFunctionUniq.h index 62eb1db8115..bb292091788 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionUniq.h @@ -16,7 +16,6 @@ #include #include #include -#include #include #include diff --git a/dbms/src/AggregateFunctions/registerAggregateFunctions.cpp b/dbms/src/AggregateFunctions/registerAggregateFunctions.cpp index 0ef138119f9..2d5a0eafc07 100644 --- a/dbms/src/AggregateFunctions/registerAggregateFunctions.cpp +++ b/dbms/src/AggregateFunctions/registerAggregateFunctions.cpp @@ -29,6 +29,7 @@ void registerAggregateFunctionsBitwise(AggregateFunctionFactory &); void registerAggregateFunctionsBitmap(AggregateFunctionFactory &); void registerAggregateFunctionsMaxIntersections(AggregateFunctionFactory &); void registerAggregateFunctionEntropy(AggregateFunctionFactory &); +void registerAggregateFunctionLeastSqr(AggregateFunctionFactory &); void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &); void registerAggregateFunctionCombinatorArray(AggregateFunctionCombinatorFactory &); @@ -69,6 +70,7 @@ void registerAggregateFunctions() registerAggregateFunctionHistogram(factory); registerAggregateFunctionRetention(factory); registerAggregateFunctionEntropy(factory); + registerAggregateFunctionLeastSqr(factory); } { diff --git a/dbms/src/Common/AlignedBuffer.h b/dbms/src/Common/AlignedBuffer.h index 6534d7dc0ef..0d9ecb61f2b 100644 --- a/dbms/src/Common/AlignedBuffer.h +++ b/dbms/src/Common/AlignedBuffer.h @@ -10,7 +10,7 @@ namespace DB /** Aligned piece of memory. * It can only be allocated and destroyed. - * MemoryTracker is not used. It is intended for small pieces of memory. + * MemoryTracker is not used. AlignedBuffer is intended for small pieces of memory. */ class AlignedBuffer : private boost::noncopyable { diff --git a/dbms/src/Common/Allocator.cpp b/dbms/src/Common/Allocator.cpp deleted file mode 100644 index d67130a5082..00000000000 --- a/dbms/src/Common/Allocator.cpp +++ /dev/null @@ -1,190 +0,0 @@ -#include - -#if !defined(__APPLE__) && !defined(__FreeBSD__) -#include -#endif - -#include -#include -#include - -#include -#ifdef THREAD_SANITIZER - /// Thread sanitizer does not intercept mremap. The usage of mremap will lead to false positives. - #define DISABLE_MREMAP 1 -#endif -#include - -#include -#include -#include -#include - - -/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS -#ifndef MAP_ANONYMOUS -#define MAP_ANONYMOUS MAP_ANON -#endif - - -namespace DB -{ -namespace ErrorCodes -{ - extern const int BAD_ARGUMENTS; - extern const int CANNOT_ALLOCATE_MEMORY; - extern const int CANNOT_MUNMAP; - extern const int CANNOT_MREMAP; -} -} - - -/** Many modern allocators (for example, tcmalloc) do not do a mremap for realloc, - * even in case of large enough chunks of memory. - * Although this allows you to increase performance and reduce memory consumption during realloc. - * To fix this, we do mremap manually if the chunk of memory is large enough. - * The threshold (64 MB) is chosen quite large, since changing the address space is - * very slow, especially in the case of a large number of threads. - * We expect that the set of operations mmap/something to do/mremap can only be performed about 1000 times per second. - * - * PS. This is also required, because tcmalloc can not allocate a chunk of memory greater than 16 GB. - */ -#ifdef NDEBUG - static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20); -#else - /// In debug build, use small mmap threshold to reproduce more memory stomping bugs. - /// Along with ASLR it will hopefully detect more issues than ASan. - /// The program may fail due to the limit on number of memory mappings. - static constexpr size_t MMAP_THRESHOLD = 4096; -#endif - -static constexpr size_t MMAP_MIN_ALIGNMENT = 4096; -static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; - - -template -void * Allocator::mmap_hint() -{ -#if ALLOCATOR_ASLR - return reinterpret_cast(std::uniform_int_distribution(0x100000000000UL, 0x700000000000UL)(rng)); -#else - return nullptr; -#endif -} - - -template -void * Allocator::alloc(size_t size, size_t alignment) -{ - CurrentMemoryTracker::alloc(size); - - void * buf; - - if (size >= MMAP_THRESHOLD) - { - if (alignment > MMAP_MIN_ALIGNMENT) - throw DB::Exception("Too large alignment " + formatReadableSizeWithBinarySuffix(alignment) + ": more than page size when allocating " - + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::BAD_ARGUMENTS); - - buf = mmap(mmap_hint(), size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if (MAP_FAILED == buf) - DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); - - /// No need for zero-fill, because mmap guarantees it. - } - else - { - if (alignment <= MALLOC_MIN_ALIGNMENT) - { - if (clear_memory) - buf = ::calloc(size, 1); - else - buf = ::malloc(size); - - if (nullptr == buf) - DB::throwFromErrno("Allocator: Cannot malloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); - } - else - { - buf = nullptr; - int res = posix_memalign(&buf, alignment, size); - - if (0 != res) - DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res); - - if (clear_memory) - memset(buf, 0, size); - } - } - - return buf; -} - - -template -void Allocator::free(void * buf, size_t size) -{ - if (size >= MMAP_THRESHOLD) - { - if (0 != munmap(buf, size)) - DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP); - } - else - { - ::free(buf); - } - - CurrentMemoryTracker::free(size); -} - - -template -void * Allocator::realloc(void * buf, size_t old_size, size_t new_size, size_t alignment) -{ - if (old_size == new_size) - { - /// nothing to do. - /// BTW, it's not possible to change alignment while doing realloc. - } - else if (old_size < MMAP_THRESHOLD && new_size < MMAP_THRESHOLD && alignment <= MALLOC_MIN_ALIGNMENT) - { - /// Resize malloc'd memory region with no special alignment requirement. - CurrentMemoryTracker::realloc(old_size, new_size); - - void * new_buf = ::realloc(buf, new_size); - if (nullptr == new_buf) - DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); - - buf = new_buf; - if (clear_memory && new_size > old_size) - memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); - } - else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) - { - /// Resize mmap'd memory region. - CurrentMemoryTracker::realloc(old_size, new_size); - - // On apple and freebsd self-implemented mremap used (common/mremap.h) - buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if (MAP_FAILED == buf) - DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP); - - /// No need for zero-fill, because mmap guarantees it. - } - else - { - /// All other cases that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods. - - void * new_buf = alloc(new_size, alignment); - memcpy(new_buf, buf, std::min(old_size, new_size)); - free(buf, old_size); - buf = new_buf; - } - - return buf; -} - - -/// Explicit template instantiations. -template class Allocator; -template class Allocator; diff --git a/dbms/src/Common/Allocator.h b/dbms/src/Common/Allocator.h index d2a81f77b62..8b0a4b86a69 100644 --- a/dbms/src/Common/Allocator.h +++ b/dbms/src/Common/Allocator.h @@ -10,11 +10,88 @@ #define ALLOCATOR_ASLR 1 #endif -#if ALLOCATOR_ASLR - #include - #include +#include +#include + +#if !defined(__APPLE__) && !defined(__FreeBSD__) +#include #endif +#include +#include +#include + +#include +#ifdef THREAD_SANITIZER + /// Thread sanitizer does not intercept mremap. The usage of mremap will lead to false positives. + #define DISABLE_MREMAP 1 +#endif +#include + +#include +#include +#include + + +/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS +#ifndef MAP_ANONYMOUS +#define MAP_ANONYMOUS MAP_ANON +#endif + + +/** Many modern allocators (for example, tcmalloc) do not do a mremap for realloc, + * even in case of large enough chunks of memory. + * Although this allows you to increase performance and reduce memory consumption during realloc. + * To fix this, we do mremap manually if the chunk of memory is large enough. + * The threshold (64 MB) is chosen quite large, since changing the address space is + * very slow, especially in the case of a large number of threads. + * We expect that the set of operations mmap/something to do/mremap can only be performed about 1000 times per second. + * + * PS. This is also required, because tcmalloc can not allocate a chunk of memory greater than 16 GB. + */ +#ifdef NDEBUG + static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20); +#else + /// In debug build, use small mmap threshold to reproduce more memory stomping bugs. + /// Along with ASLR it will hopefully detect more issues than ASan. + /// The program may fail due to the limit on number of memory mappings. + static constexpr size_t MMAP_THRESHOLD = 4096; +#endif + +static constexpr size_t MMAP_MIN_ALIGNMENT = 4096; +static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int CANNOT_ALLOCATE_MEMORY; + extern const int CANNOT_MUNMAP; + extern const int CANNOT_MREMAP; +} +} + +namespace AllocatorHints +{ +struct DefaultHint +{ + void * mmap_hint() + { + return nullptr; + } +}; + +struct RandomHint +{ + void * mmap_hint() + { + return reinterpret_cast(std::uniform_int_distribution(0x100000000000UL, 0x700000000000UL)(rng)); + } +private: + pcg64 rng{randomSeed()}; +}; +} /** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena. * Also used in hash tables. @@ -23,31 +100,126 @@ * - passing the size into the `free` method; * - by the presence of the `alignment` argument; * - the possibility of zeroing memory (used in hash tables); + * - hint class for mmap + * - mmap_threshold for using mmap less or more */ -template -class Allocator +template +class AllocatorWithHint : Hint { -#if ALLOCATOR_ASLR -private: - pcg64 rng{randomSeed()}; -#endif - void * mmap_hint(); - protected: static constexpr bool clear_memory = clear_memory_; public: /// Allocate memory range. - void * alloc(size_t size, size_t alignment = 0); + void * alloc(size_t size, size_t alignment = 0) + { + CurrentMemoryTracker::alloc(size); + + void * buf; + + if (size >= mmap_threshold) + { + if (alignment > MMAP_MIN_ALIGNMENT) + throw DB::Exception("Too large alignment " + formatReadableSizeWithBinarySuffix(alignment) + ": more than page size when allocating " + + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::BAD_ARGUMENTS); + + buf = mmap(Hint::mmap_hint(), size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (MAP_FAILED == buf) + DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); + + /// No need for zero-fill, because mmap guarantees it. + } + else + { + if (alignment <= MALLOC_MIN_ALIGNMENT) + { + if constexpr (clear_memory) + buf = ::calloc(size, 1); + else + buf = ::malloc(size); + + if (nullptr == buf) + DB::throwFromErrno("Allocator: Cannot malloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); + } + else + { + buf = nullptr; + int res = posix_memalign(&buf, alignment, size); + + if (0 != res) + DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res); + + if (clear_memory) + memset(buf, 0, size); + } + } + return buf; + } /// Free memory range. - void free(void * buf, size_t size); + void free(void * buf, size_t size) + { + if (size >= mmap_threshold) + { + if (0 != munmap(buf, size)) + DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP); + } + else + { + ::free(buf); + } + + CurrentMemoryTracker::free(size); + } /** Enlarge memory range. * Data from old range is moved to the beginning of new range. * Address of memory range could change. */ - void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0); + void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0) + { + if (old_size == new_size) + { + /// nothing to do. + /// BTW, it's not possible to change alignment while doing realloc. + } + else if (old_size < mmap_threshold && new_size < mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT) + { + /// Resize malloc'd memory region with no special alignment requirement. + CurrentMemoryTracker::realloc(old_size, new_size); + + void * new_buf = ::realloc(buf, new_size); + if (nullptr == new_buf) + DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); + + buf = new_buf; + if (clear_memory && new_size > old_size) + memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); + } + else if (old_size >= mmap_threshold && new_size >= mmap_threshold) + { + /// Resize mmap'd memory region. + CurrentMemoryTracker::realloc(old_size, new_size); + + // On apple and freebsd self-implemented mremap used (common/mremap.h) + buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (MAP_FAILED == buf) + DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP); + + /// No need for zero-fill, because mmap guarantees it. + } + else + { + /// All other cases that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods. + + void * new_buf = alloc(new_size, alignment); + memcpy(new_buf, buf, std::min(old_size, new_size)); + free(buf, old_size); + buf = new_buf; + } + + return buf; + } protected: static constexpr size_t getStackThreshold() @@ -56,6 +228,13 @@ protected: } }; +#if ALLOCATOR_ASLR +template +using Allocator = AllocatorWithHint; +#else +template +using Allocator = AllocatorWithHint; +#endif /** When using AllocatorWithStackMemory, located on the stack, * GCC 4.9 mistakenly assumes that we can call `free` from a pointer to the stack. diff --git a/dbms/src/Common/Arena.h b/dbms/src/Common/Arena.h index 30a4f4fcb2e..e8e84a1e084 100644 --- a/dbms/src/Common/Arena.h +++ b/dbms/src/Common/Arena.h @@ -49,7 +49,7 @@ private: ProfileEvents::increment(ProfileEvents::ArenaAllocChunks); ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_); - begin = reinterpret_cast(Allocator::alloc(size_)); + begin = reinterpret_cast(Allocator::alloc(size_)); pos = begin; end = begin + size_ - pad_right; prev = prev_; @@ -57,7 +57,7 @@ private: ~Chunk() { - Allocator::free(begin, size()); + Allocator::free(begin, size()); if (prev) delete prev; diff --git a/dbms/src/Common/ArenaWithFreeLists.h b/dbms/src/Common/ArenaWithFreeLists.h index 599e8779941..0fb0d47f8c1 100644 --- a/dbms/src/Common/ArenaWithFreeLists.h +++ b/dbms/src/Common/ArenaWithFreeLists.h @@ -55,7 +55,7 @@ public: char * alloc(const size_t size) { if (size > max_fixed_block_size) - return static_cast(Allocator::alloc(size)); + return static_cast(Allocator::alloc(size)); /// find list of required size const auto list_idx = findFreeListIndex(size); @@ -76,7 +76,7 @@ public: void free(char * ptr, const size_t size) { if (size > max_fixed_block_size) - return Allocator::free(ptr, size); + return Allocator::free(ptr, size); /// find list of required size const auto list_idx = findFreeListIndex(size); diff --git a/dbms/src/Common/SortedLookupPODArray.h b/dbms/src/Common/SortedLookupPODArray.h deleted file mode 100644 index d9b03f5704d..00000000000 --- a/dbms/src/Common/SortedLookupPODArray.h +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once - -#include -//#include - -namespace DB -{ - -/** - * This class is intended to push sortable data into. - * When looking up values the container ensures that it is sorted for log(N) lookup - * - * Note, this is only efficient when the insertions happen in one stage, followed by all retrievals - * This way the data only gets sorted once. - */ - -template -class SortedLookupPODArray -{ -public: - using Base = std::vector; - //using Base = PaddedPODArray; - - template - void insert(U && x, TAllocatorParams &&... allocator_params) - { - array.push_back(std::forward(x), std::forward(allocator_params)...); - sorted = false; - } - - typename Base::const_iterator upper_bound(const T & k) - { - if (!sorted) - sort(); - return std::upper_bound(array.cbegin(), array.cend(), k); - } - - typename Base::const_iterator cbegin() const { return array.cbegin(); } - typename Base::const_iterator cend() const { return array.cend(); } - -private: - Base array; - bool sorted = false; - - void sort() - { - std::sort(array.begin(), array.end()); - sorted = true; - } -}; - -} diff --git a/dbms/src/Compression/CachedCompressedReadBuffer.cpp b/dbms/src/Compression/CachedCompressedReadBuffer.cpp index 4660bce2074..b39d04cf03f 100644 --- a/dbms/src/Compression/CachedCompressedReadBuffer.cpp +++ b/dbms/src/Compression/CachedCompressedReadBuffer.cpp @@ -35,7 +35,7 @@ bool CachedCompressedReadBuffer::nextImpl() UInt128 key = cache->hash(path, file_pos); owned_cell = cache->get(key); - if (!owned_cell || !codec) + if (!owned_cell) { /// If not, read it from the file. initInput(); @@ -49,21 +49,22 @@ bool CachedCompressedReadBuffer::nextImpl() if (owned_cell->compressed_size) { - owned_cell->data.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer()); + owned_cell->additional_bytes = codec->getAdditionalSizeAtTheEndOfBuffer(); + owned_cell->data.resize(size_decompressed + owned_cell->additional_bytes); decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum); - /// Put data into cache. - cache->set(key, owned_cell); } + + /// Put data into cache. + /// NOTE: Even if we don't read anything (compressed_size == 0) + /// because we can reuse this information and don't reopen file in future + cache->set(key, owned_cell); } if (owned_cell->data.size() == 0) - { - owned_cell = nullptr; return false; - } - working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - codec->getAdditionalSizeAtTheEndOfBuffer()); + working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes); file_pos += owned_cell->compressed_size; diff --git a/dbms/src/Core/BackgroundSchedulePool.cpp b/dbms/src/Core/BackgroundSchedulePool.cpp index ce67c895234..ee63fdbadff 100644 --- a/dbms/src/Core/BackgroundSchedulePool.cpp +++ b/dbms/src/Core/BackgroundSchedulePool.cpp @@ -23,20 +23,21 @@ namespace DB class TaskNotification final : public Poco::Notification { public: - explicit TaskNotification(const BackgroundSchedulePool::TaskInfoPtr & task) : task(task) {} + explicit TaskNotification(const BackgroundSchedulePoolTaskInfoPtr & task) : task(task) {} void execute() { task->execute(); } private: - BackgroundSchedulePool::TaskInfoPtr task; + BackgroundSchedulePoolTaskInfoPtr task; }; -BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_) - : pool(pool_) , log_name(log_name_) , function(function_) +BackgroundSchedulePoolTaskInfo::BackgroundSchedulePoolTaskInfo( + BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_) + : pool(pool_), log_name(log_name_), function(function_) { } -bool BackgroundSchedulePool::TaskInfo::schedule() +bool BackgroundSchedulePoolTaskInfo::schedule() { std::lock_guard lock(schedule_mutex); @@ -47,7 +48,7 @@ bool BackgroundSchedulePool::TaskInfo::schedule() return true; } -bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms) +bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t ms) { std::lock_guard lock(schedule_mutex); @@ -58,7 +59,7 @@ bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms) return true; } -void BackgroundSchedulePool::TaskInfo::deactivate() +void BackgroundSchedulePoolTaskInfo::deactivate() { std::lock_guard lock_exec(exec_mutex); std::lock_guard lock_schedule(schedule_mutex); @@ -73,13 +74,13 @@ void BackgroundSchedulePool::TaskInfo::deactivate() pool.cancelDelayedTask(shared_from_this(), lock_schedule); } -void BackgroundSchedulePool::TaskInfo::activate() +void BackgroundSchedulePoolTaskInfo::activate() { std::lock_guard lock(schedule_mutex); deactivated = false; } -bool BackgroundSchedulePool::TaskInfo::activateAndSchedule() +bool BackgroundSchedulePoolTaskInfo::activateAndSchedule() { std::lock_guard lock(schedule_mutex); @@ -91,7 +92,7 @@ bool BackgroundSchedulePool::TaskInfo::activateAndSchedule() return true; } -void BackgroundSchedulePool::TaskInfo::execute() +void BackgroundSchedulePoolTaskInfo::execute() { Stopwatch watch; CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask}; @@ -131,7 +132,7 @@ void BackgroundSchedulePool::TaskInfo::execute() } } -void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard & schedule_mutex_lock) +void BackgroundSchedulePoolTaskInfo::scheduleImpl(std::lock_guard & schedule_mutex_lock) { scheduled = true; @@ -145,7 +146,7 @@ void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard pool.queue.enqueueNotification(new TaskNotification(shared_from_this())); } -Coordination::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback() +Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback() { return [t = shared_from_this()](const Coordination::WatchResponse &) { diff --git a/dbms/src/Core/BackgroundSchedulePool.h b/dbms/src/Core/BackgroundSchedulePool.h index 11f2c5195e6..f2627366da7 100644 --- a/dbms/src/Core/BackgroundSchedulePool.h +++ b/dbms/src/Core/BackgroundSchedulePool.h @@ -20,6 +20,8 @@ namespace DB { class TaskNotification; +class BackgroundSchedulePoolTaskInfo; +class BackgroundSchedulePoolTaskHolder; /** Executes functions scheduled at a specific point in time. @@ -35,84 +37,14 @@ class TaskNotification; class BackgroundSchedulePool { public: - class TaskInfo; + friend class BackgroundSchedulePoolTaskInfo; + + using TaskInfo = BackgroundSchedulePoolTaskInfo; using TaskInfoPtr = std::shared_ptr; using TaskFunc = std::function; + using TaskHolder = BackgroundSchedulePoolTaskHolder; using DelayedTasks = std::multimap; - class TaskInfo : public std::enable_shared_from_this, private boost::noncopyable - { - public: - TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_); - - /// Schedule for execution as soon as possible (if not already scheduled). - /// If the task was already scheduled with delay, the delay will be ignored. - bool schedule(); - - /// Schedule for execution after specified delay. - bool scheduleAfter(size_t ms); - - /// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task. - void deactivate(); - - void activate(); - - /// Atomically activate task and schedule it for execution. - bool activateAndSchedule(); - - /// get Coordination::WatchCallback needed for notifications from ZooKeeper watches. - Coordination::WatchCallback getWatchCallback(); - - private: - friend class TaskNotification; - friend class BackgroundSchedulePool; - - void execute(); - - void scheduleImpl(std::lock_guard & schedule_mutex_lock); - - BackgroundSchedulePool & pool; - std::string log_name; - TaskFunc function; - - std::mutex exec_mutex; - std::mutex schedule_mutex; - - /// Invariants: - /// * If deactivated is true then scheduled, delayed and executing are all false. - /// * scheduled and delayed cannot be true at the same time. - bool deactivated = false; - bool scheduled = false; - bool delayed = false; - bool executing = false; - - /// If the task is scheduled with delay, points to element of delayed_tasks. - DelayedTasks::iterator iterator; - }; - - class TaskHolder - { - public: - TaskHolder() = default; - explicit TaskHolder(const TaskInfoPtr & task_info_) : task_info(task_info_) {} - TaskHolder(const TaskHolder & other) = delete; - TaskHolder(TaskHolder && other) noexcept = default; - TaskHolder & operator=(const TaskHolder & other) noexcept = delete; - TaskHolder & operator=(TaskHolder && other) noexcept = default; - - ~TaskHolder() - { - if (task_info) - task_info->deactivate(); - } - - TaskInfo * operator->() { return task_info.get(); } - const TaskInfo * operator->() const { return task_info.get(); } - - private: - TaskInfoPtr task_info; - }; - TaskHolder createTask(const std::string & log_name, const TaskFunc & function); size_t getNumberOfThreads() const { return size; } @@ -153,4 +85,81 @@ private: void attachToThreadGroup(); }; + +class BackgroundSchedulePoolTaskInfo : public std::enable_shared_from_this, private boost::noncopyable +{ +public: + BackgroundSchedulePoolTaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_); + + /// Schedule for execution as soon as possible (if not already scheduled). + /// If the task was already scheduled with delay, the delay will be ignored. + bool schedule(); + + /// Schedule for execution after specified delay. + bool scheduleAfter(size_t ms); + + /// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task. + void deactivate(); + + void activate(); + + /// Atomically activate task and schedule it for execution. + bool activateAndSchedule(); + + /// get Coordination::WatchCallback needed for notifications from ZooKeeper watches. + Coordination::WatchCallback getWatchCallback(); + +private: + friend class TaskNotification; + friend class BackgroundSchedulePool; + + void execute(); + + void scheduleImpl(std::lock_guard & schedule_mutex_lock); + + BackgroundSchedulePool & pool; + std::string log_name; + BackgroundSchedulePool::TaskFunc function; + + std::mutex exec_mutex; + std::mutex schedule_mutex; + + /// Invariants: + /// * If deactivated is true then scheduled, delayed and executing are all false. + /// * scheduled and delayed cannot be true at the same time. + bool deactivated = false; + bool scheduled = false; + bool delayed = false; + bool executing = false; + + /// If the task is scheduled with delay, points to element of delayed_tasks. + BackgroundSchedulePool::DelayedTasks::iterator iterator; +}; + +using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr; + + +class BackgroundSchedulePoolTaskHolder +{ +public: + BackgroundSchedulePoolTaskHolder() = default; + explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {} + BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete; + BackgroundSchedulePoolTaskHolder(BackgroundSchedulePoolTaskHolder && other) noexcept = default; + BackgroundSchedulePoolTaskHolder & operator=(const BackgroundSchedulePoolTaskHolder & other) noexcept = delete; + BackgroundSchedulePoolTaskHolder & operator=(BackgroundSchedulePoolTaskHolder && other) noexcept = default; + + ~BackgroundSchedulePoolTaskHolder() + { + if (task_info) + task_info->deactivate(); + } + + BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); } + const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); } + +private: + BackgroundSchedulePoolTaskInfoPtr task_info; +}; + } diff --git a/dbms/src/DataStreams/AsynchronousBlockInputStream.h b/dbms/src/DataStreams/AsynchronousBlockInputStream.h index 53e265fdddd..6cfa247ab44 100644 --- a/dbms/src/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/src/DataStreams/AsynchronousBlockInputStream.h @@ -5,8 +5,6 @@ #include #include #include -#include -#include namespace CurrentMetrics diff --git a/dbms/src/DataStreams/BlockIO.h b/dbms/src/DataStreams/BlockIO.h index 5ebfa45d179..4618b183d48 100644 --- a/dbms/src/DataStreams/BlockIO.h +++ b/dbms/src/DataStreams/BlockIO.h @@ -43,6 +43,9 @@ struct BlockIO BlockIO & operator= (const BlockIO & rhs) { + if (this == &rhs) + return *this; + out.reset(); in.reset(); process_list_entry.reset(); diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index bde030d8afa..96ea9112e1d 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h index 44de41b2802..9fe322c3f43 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h @@ -8,8 +8,6 @@ #include -class MemoryTracker; - namespace DB { diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index b7402a45793..9c7a1fc6928 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -11,7 +11,6 @@ #include #include #include -#include #include #include diff --git a/dbms/src/DataTypes/DataTypeDomainIPv4AndIPv6.cpp b/dbms/src/DataTypes/DataTypeDomainIPv4AndIPv6.cpp index 339409caf24..873dbde506b 100644 --- a/dbms/src/DataTypes/DataTypeDomainIPv4AndIPv6.cpp +++ b/dbms/src/DataTypes/DataTypeDomainIPv4AndIPv6.cpp @@ -20,7 +20,7 @@ namespace ErrorCodes namespace { -class DataTypeDomanIPv4 : public DataTypeDomainWithSimpleSerialization +class DataTypeDomainIPv4 : public DataTypeDomainWithSimpleSerialization { public: const char * getName() const override @@ -63,7 +63,7 @@ public: } }; -class DataTypeDomanIPv6 : public DataTypeDomainWithSimpleSerialization +class DataTypeDomainIPv6 : public DataTypeDomainWithSimpleSerialization { public: const char * getName() const override @@ -111,8 +111,8 @@ public: void registerDataTypeDomainIPv4AndIPv6(DataTypeFactory & factory) { - factory.registerDataTypeDomain("UInt32", std::make_unique()); - factory.registerDataTypeDomain("FixedString(16)", std::make_unique()); + factory.registerDataTypeDomain("UInt32", std::make_unique()); + factory.registerDataTypeDomain("FixedString(16)", std::make_unique()); } } // namespace DB diff --git a/dbms/src/DataTypes/DataTypeLowCardinality.cpp b/dbms/src/DataTypes/DataTypeLowCardinality.cpp index 98b662d8fe8..504a451741d 100644 --- a/dbms/src/DataTypes/DataTypeLowCardinality.cpp +++ b/dbms/src/DataTypes/DataTypeLowCardinality.cpp @@ -690,10 +690,9 @@ void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams( }; if (!settings.continuous_reading) + { low_cardinality_state->num_pending_rows = 0; - if (!settings.continuous_reading) - { /// Remember in state that some granules were skipped and we need to update dictionary. low_cardinality_state->need_update_dictionary = true; } diff --git a/dbms/src/Functions/isValidUTF8.cpp b/dbms/src/Functions/isValidUTF8.cpp new file mode 100644 index 00000000000..947aaa7039a --- /dev/null +++ b/dbms/src/Functions/isValidUTF8.cpp @@ -0,0 +1,306 @@ +#include +#include +#include + +#include + +#ifdef __SSE4_1__ +# include +# include +# include +#endif + +namespace DB +{ +struct ValidUTF8Impl +{ + /* + * inspired by https://github.com/cyb70289/utf8/ + * http://www.unicode.org/versions/Unicode6.0.0/ch03.pdf - page 94 + * + * Table 3-7. Well-Formed UTF-8 Byte Sequences + * + * +--------------------+------------+-------------+------------+-------------+ + * | Code Points | First Byte | Second Byte | Third Byte | Fourth Byte | + * +--------------------+------------+-------------+------------+-------------+ + * | U+0000..U+007F | 00..7F | | | | + * +--------------------+------------+-------------+------------+-------------+ + * | U+0080..U+07FF | C2..DF | 80..BF | | | + * +--------------------+------------+-------------+------------+-------------+ + * | U+0800..U+0FFF | E0 | A0..BF | 80..BF | | + * +--------------------+------------+-------------+------------+-------------+ + * | U+1000..U+CFFF | E1..EC | 80..BF | 80..BF | | + * +--------------------+------------+-------------+------------+-------------+ + * | U+D000..U+D7FF | ED | 80..9F | 80..BF | | + * +--------------------+------------+-------------+------------+-------------+ + * | U+E000..U+FFFF | EE..EF | 80..BF | 80..BF | | + * +--------------------+------------+-------------+------------+-------------+ + * | U+10000..U+3FFFF | F0 | 90..BF | 80..BF | 80..BF | + * +--------------------+------------+-------------+------------+-------------+ + * | U+40000..U+FFFFF | F1..F3 | 80..BF | 80..BF | 80..BF | + * +--------------------+------------+-------------+------------+-------------+ + * | U+100000..U+10FFFF | F4 | 80..8F | 80..BF | 80..BF | + * +--------------------+------------+-------------+------------+-------------+ + */ + + static inline UInt8 isValidUTF8Naive(const UInt8 * data, UInt64 len) + { + while (len) + { + int bytes; + const UInt8 byte1 = data[0]; + /* 00..7F */ + if (byte1 <= 0x7F) + { + bytes = 1; + } + /* C2..DF, 80..BF */ + else if (len >= 2 && byte1 >= 0xC2 && byte1 <= 0xDF && static_cast(data[1]) <= static_cast(0xBF)) + { + bytes = 2; + } + else if (len >= 3) + { + const UInt8 byte2 = data[1]; + bool byte2_ok = static_cast(byte2) <= static_cast(0xBF); + bool byte3_ok = static_cast(data[2]) <= static_cast(0xBF); + + if (byte2_ok && byte3_ok && + /* E0, A0..BF, 80..BF */ + ((byte1 == 0xE0 && byte2 >= 0xA0) || + /* E1..EC, 80..BF, 80..BF */ + (byte1 >= 0xE1 && byte1 <= 0xEC) || + /* ED, 80..9F, 80..BF */ + (byte1 == 0xED && byte2 <= 0x9F) || + /* EE..EF, 80..BF, 80..BF */ + (byte1 >= 0xEE && byte1 <= 0xEF))) + { + bytes = 3; + } + else if (len >= 4) + { + bool byte4_ok = static_cast(data[3]) <= static_cast(0xBF); + if (byte2_ok && byte3_ok && byte4_ok && + /* F0, 90..BF, 80..BF, 80..BF */ + ((byte1 == 0xF0 && byte2 >= 0x90) || + /* F1..F3, 80..BF, 80..BF, 80..BF */ + (byte1 >= 0xF1 && byte1 <= 0xF3) || + /* F4, 80..8F, 80..BF, 80..BF */ + (byte1 == 0xF4 && byte2 <= 0x8F))) + { + bytes = 4; + } + else + { + return false; + } + } + else + { + return false; + } + } + else + { + return false; + } + len -= bytes; + data += bytes; + } + return true; + } + +#ifndef __SSE4_1__ + static inline UInt8 isValidUTF8(const UInt8 * data, UInt64 len) { return isValidUTF8Naive(data, len); } +#else + static inline UInt8 isValidUTF8(const UInt8 * data, UInt64 len) + { + /* + * Map high nibble of "First Byte" to legal character length minus 1 + * 0x00 ~ 0xBF --> 0 + * 0xC0 ~ 0xDF --> 1 + * 0xE0 ~ 0xEF --> 2 + * 0xF0 ~ 0xFF --> 3 + */ + const __m128i first_len_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 3); + + /* Map "First Byte" to 8-th item of range table (0xC2 ~ 0xF4) */ + const __m128i first_range_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8); + + /* + * Range table, map range index to min and max values + */ + const __m128i range_min_tbl + = _mm_setr_epi8(0x00, 0x80, 0x80, 0x80, 0xA0, 0x80, 0x90, 0x80, 0xC2, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F); + + const __m128i range_max_tbl + = _mm_setr_epi8(0x7F, 0xBF, 0xBF, 0xBF, 0xBF, 0x9F, 0xBF, 0x8F, 0xF4, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80); + + /* + * Tables for fast handling of four special First Bytes(E0,ED,F0,F4), after + * which the Second Byte are not 80~BF. It contains "range index adjustment". + * +------------+---------------+------------------+----------------+ + * | First Byte | original range| range adjustment | adjusted range | + * +------------+---------------+------------------+----------------+ + * | E0 | 2 | 2 | 4 | + * +------------+---------------+------------------+----------------+ + * | ED | 2 | 3 | 5 | + * +------------+---------------+------------------+----------------+ + * | F0 | 3 | 3 | 6 | + * +------------+---------------+------------------+----------------+ + * | F4 | 4 | 4 | 8 | + * +------------+---------------+------------------+----------------+ + */ + + /* index1 -> E0, index14 -> ED */ + const __m128i df_ee_tbl = _mm_setr_epi8(0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0); + + /* index1 -> F0, index5 -> F4 */ + const __m128i ef_fe_tbl = _mm_setr_epi8(0, 3, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + + __m128i prev_input = _mm_set1_epi8(0); + __m128i prev_first_len = _mm_set1_epi8(0); + __m128i error = _mm_set1_epi8(0); + + auto check_packed = [&](__m128i input) noexcept + { + /* high_nibbles = input >> 4 */ + const __m128i high_nibbles = _mm_and_si128(_mm_srli_epi16(input, 4), _mm_set1_epi8(0x0F)); + + /* first_len = legal character length minus 1 */ + /* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */ + /* first_len = first_len_tbl[high_nibbles] */ + __m128i first_len = _mm_shuffle_epi8(first_len_tbl, high_nibbles); + + /* First Byte: set range index to 8 for bytes within 0xC0 ~ 0xFF */ + /* range = first_range_tbl[high_nibbles] */ + __m128i range = _mm_shuffle_epi8(first_range_tbl, high_nibbles); + + /* Second Byte: set range index to first_len */ + /* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */ + /* range |= (first_len, prev_first_len) << 1 byte */ + range = _mm_or_si128(range, _mm_alignr_epi8(first_len, prev_first_len, 15)); + + /* Third Byte: set range index to saturate_sub(first_len, 1) */ + /* 0 for 00~7F, 0 for C0~DF, 1 for E0~EF, 2 for F0~FF */ + __m128i tmp1; + __m128i tmp2; + /* tmp1 = saturate_sub(first_len, 1) */ + tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(1)); + /* tmp2 = saturate_sub(prev_first_len, 1) */ + tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(1)); + /* range |= (tmp1, tmp2) << 2 bytes */ + range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 14)); + + /* Fourth Byte: set range index to saturate_sub(first_len, 2) */ + /* 0 for 00~7F, 0 for C0~DF, 0 for E0~EF, 1 for F0~FF */ + /* tmp1 = saturate_sub(first_len, 2) */ + tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(2)); + /* tmp2 = saturate_sub(prev_first_len, 2) */ + tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(2)); + /* range |= (tmp1, tmp2) << 3 bytes */ + range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 13)); + + /* + * Now we have below range indices caluclated + * Correct cases: + * - 8 for C0~FF + * - 3 for 1st byte after F0~FF + * - 2 for 1st byte after E0~EF or 2nd byte after F0~FF + * - 1 for 1st byte after C0~DF or 2nd byte after E0~EF or + * 3rd byte after F0~FF + * - 0 for others + * Error cases: + * 9,10,11 if non ascii First Byte overlaps + * E.g., F1 80 C2 90 --> 8 3 10 2, where 10 indicates error + */ + + /* Adjust Second Byte range for special First Bytes(E0,ED,F0,F4) */ + /* Overlaps lead to index 9~15, which are illegal in range table */ + __m128i shift1, pos, range2; + /* shift1 = (input, prev_input) << 1 byte */ + shift1 = _mm_alignr_epi8(input, prev_input, 15); + pos = _mm_sub_epi8(shift1, _mm_set1_epi8(0xEF)); + /* + * shift1: | EF F0 ... FE | FF 00 ... ... DE | DF E0 ... EE | + * pos: | 0 1 15 | 16 17 239| 240 241 255| + * pos-240: | 0 0 0 | 0 0 0 | 0 1 15 | + * pos+112: | 112 113 127| >= 128 | >= 128 | + */ + tmp1 = _mm_subs_epu8(pos, _mm_set1_epi8(240)); + range2 = _mm_shuffle_epi8(df_ee_tbl, tmp1); + tmp2 = _mm_adds_epu8(pos, _mm_set1_epi8(112)); + range2 = _mm_add_epi8(range2, _mm_shuffle_epi8(ef_fe_tbl, tmp2)); + + range = _mm_add_epi8(range, range2); + + /* Load min and max values per calculated range index */ + __m128i minv = _mm_shuffle_epi8(range_min_tbl, range); + __m128i maxv = _mm_shuffle_epi8(range_max_tbl, range); + + /* Check value range */ + error = _mm_or_si128(error, _mm_cmplt_epi8(input, minv)); + error = _mm_or_si128(error, _mm_cmpgt_epi8(input, maxv)); + + prev_input = input; + prev_first_len = first_len; + + data += 16; + len -= 16; + }; + + while (len >= 16) + check_packed(_mm_loadu_si128(reinterpret_cast(data))); + + /// 0 <= len <= 15 for now. Reading data from data - 1 because of right padding of 15 and left padding + /// Then zero some bytes from the unknown memory and check again. + alignas(16) char buf[32]; + _mm_store_si128(reinterpret_cast<__m128i *>(buf), _mm_loadu_si128(reinterpret_cast(data - 1))); + memset(buf + len + 1, 0, 16); + check_packed(_mm_loadu_si128(reinterpret_cast<__m128i *>(buf + 1))); + + /* Reduce error vector, error_reduced = 0xFFFF if error == 0 */ + return _mm_movemask_epi8(_mm_cmpeq_epi8(error, _mm_set1_epi8(0))) == 0xFFFF; + } +#endif + + static constexpr bool is_fixed_to_constant = false; + + static void vector(const ColumnString::Chars & data, const ColumnString::Offsets & offsets, PaddedPODArray & res) + { + size_t size = offsets.size(); + size_t prev_offset = 0; + for (size_t i = 0; i < size; ++i) + { + res[i] = isValidUTF8(data.data() + prev_offset, offsets[i] - 1 - prev_offset); + prev_offset = offsets[i]; + } + } + + static void vector_fixed_to_constant(const ColumnString::Chars & /*data*/, size_t /*n*/, UInt8 & /*res*/) {} + + static void vector_fixed_to_vector(const ColumnString::Chars & data, size_t n, PaddedPODArray & res) + { + size_t size = data.size() / n; + for (size_t i = 0; i < size; ++i) + res[i] = isValidUTF8(data.data() + i * n, n); + } + + static void array(const ColumnString::Offsets &, PaddedPODArray &) + { + throw Exception("Cannot apply function isValidUTF8 to Array argument", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } +}; + +struct NameValidUTF8 +{ + static constexpr auto name = "isValidUTF8"; +}; +using FunctionValidUTF8 = FunctionStringOrArrayToT; + +void registerFunctionValidUTF8(FunctionFactory & factory) +{ + factory.registerFunction(); +} + +} diff --git a/dbms/src/Functions/registerFunctionsString.cpp b/dbms/src/Functions/registerFunctionsString.cpp index 15d37d939b0..f04bbf84c4d 100644 --- a/dbms/src/Functions/registerFunctionsString.cpp +++ b/dbms/src/Functions/registerFunctionsString.cpp @@ -9,6 +9,7 @@ void registerFunctionEmpty(FunctionFactory &); void registerFunctionNotEmpty(FunctionFactory &); void registerFunctionLength(FunctionFactory &); void registerFunctionLengthUTF8(FunctionFactory &); +void registerFunctionValidUTF8(FunctionFactory &); void registerFunctionLower(FunctionFactory &); void registerFunctionUpper(FunctionFactory &); void registerFunctionLowerUTF8(FunctionFactory &); @@ -36,6 +37,7 @@ void registerFunctionsString(FunctionFactory & factory) registerFunctionNotEmpty(factory); registerFunctionLength(factory); registerFunctionLengthUTF8(factory); + registerFunctionValidUTF8(factory); registerFunctionLower(factory); registerFunctionUpper(factory); registerFunctionLowerUTF8(factory); diff --git a/dbms/src/IO/BufferWithOwnMemory.h b/dbms/src/IO/BufferWithOwnMemory.h index 4b37d097be5..4849a52344b 100644 --- a/dbms/src/IO/BufferWithOwnMemory.h +++ b/dbms/src/IO/BufferWithOwnMemory.h @@ -24,7 +24,8 @@ namespace DB * Differs in that is doesn't do unneeded memset. (And also tries to do as little as possible.) * Also allows to allocate aligned piece of memory (to use with O_DIRECT, for example). */ -struct Memory : boost::noncopyable, Allocator +template > +struct Memory : boost::noncopyable, Allocator { /// Padding is needed to allow usage of 'memcpySmallAllowReadWriteOverflow15' function with this buffer. static constexpr size_t pad_right = 15; @@ -136,7 +137,7 @@ template class BufferWithOwnMemory : public Base { protected: - Memory memory; + Memory<> memory; public: /// If non-nullptr 'existing_memory' is passed, then buffer will not create its own memory and will use existing_memory without ownership. BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0) diff --git a/dbms/src/IO/UncompressedCache.h b/dbms/src/IO/UncompressedCache.h index 4df3e792c54..86f1530e5b3 100644 --- a/dbms/src/IO/UncompressedCache.h +++ b/dbms/src/IO/UncompressedCache.h @@ -20,8 +20,9 @@ namespace DB struct UncompressedCacheCell { - Memory data; + Memory<> data; size_t compressed_size; + UInt32 additional_bytes; }; struct UncompressedSizeWeightFunction diff --git a/dbms/src/Interpreters/DNSCacheUpdater.cpp b/dbms/src/Interpreters/DNSCacheUpdater.cpp index 2a2d772ffb3..80ea1258f48 100644 --- a/dbms/src/Interpreters/DNSCacheUpdater.cpp +++ b/dbms/src/Interpreters/DNSCacheUpdater.cpp @@ -1,7 +1,7 @@ #include "DNSCacheUpdater.h" #include #include -#include +#include #include #include #include @@ -16,8 +16,6 @@ namespace ProfileEvents namespace DB { -using BackgroundProcessingPoolTaskInfo = BackgroundProcessingPool::TaskInfo; - namespace ErrorCodes { extern const int TIMEOUT_EXCEEDED; @@ -56,18 +54,15 @@ static bool isNetworkError() DNSCacheUpdater::DNSCacheUpdater(Context & context_) - : context(context_), pool(context_.getBackgroundPool()) + : context(context_), pool(context_.getSchedulePool()) { - task_handle = pool.addTask([this] () { return run(); }); + task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); }); } -BackgroundProcessingPoolTaskResult DNSCacheUpdater::run() +void DNSCacheUpdater::run() { - /// TODO: Ensusre that we get global counter (not thread local) auto num_current_network_exceptions = ProfileEvents::global_counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed); - - if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache - && time(nullptr) > last_update_time + min_update_period_seconds) + if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache) { try { @@ -77,32 +72,18 @@ BackgroundProcessingPoolTaskResult DNSCacheUpdater::run() context.reloadClusterConfig(); last_num_network_erros = num_current_network_exceptions; - last_update_time = time(nullptr); - - return BackgroundProcessingPoolTaskResult::SUCCESS; + task_handle->scheduleAfter(min_update_period_seconds * 1000); + return; } catch (...) { - /// Do not increment ProfileEvents::NetworkErrors twice - if (isNetworkError()) - return BackgroundProcessingPoolTaskResult::ERROR; - - throw; + tryLogCurrentException(__PRETTY_FUNCTION__); } } - /// According to BackgroundProcessingPool logic, if task has done work, it could be executed again immediately. - return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; + task_handle->scheduleAfter(10 * 1000); } -DNSCacheUpdater::~DNSCacheUpdater() -{ - if (task_handle) - pool.removeTask(task_handle); - task_handle.reset(); -} - - bool DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded() { if (isNetworkError()) diff --git a/dbms/src/Interpreters/DNSCacheUpdater.h b/dbms/src/Interpreters/DNSCacheUpdater.h index 885bcc143e3..6d34697c401 100644 --- a/dbms/src/Interpreters/DNSCacheUpdater.h +++ b/dbms/src/Interpreters/DNSCacheUpdater.h @@ -4,35 +4,31 @@ #include #include +#include + namespace DB { class Context; -class BackgroundProcessingPool; -class BackgroundProcessingPoolTaskInfo; -enum class BackgroundProcessingPoolTaskResult; - /// Add a task to BackgroundProcessingPool that watch for ProfileEvents::NetworkErrors and updates DNS cache if it has increased class DNSCacheUpdater { public: - explicit DNSCacheUpdater(Context & context); - ~DNSCacheUpdater(); /// Checks if it is a network error and increments ProfileEvents::NetworkErrors static bool incrementNetworkErrorEventsIfNeeded(); private: - BackgroundProcessingPoolTaskResult run(); + void run(); Context & context; - BackgroundProcessingPool & pool; - std::shared_ptr task_handle; + BackgroundSchedulePool & pool; + BackgroundSchedulePoolTaskHolder task_handle; + size_t last_num_network_erros = 0; - time_t last_update_time = 0; static constexpr size_t min_errors_to_update_cache = 3; static constexpr time_t min_update_period_seconds = 45; diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 8e69d5ecfee..caf53100179 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include @@ -406,7 +407,7 @@ void ExpressionAnalyzer::getAggregates(const ASTPtr & ast, ExpressionActionsPtr getRootActions(arguments[i], true, actions); const std::string & name = arguments[i]->getColumnName(); - types[i] = actions->getSampleBlock().getByName(name).type; + types[i] = recursiveRemoveLowCardinality(actions->getSampleBlock().getByName(name).type); aggregate.argument_names[i] = name; } @@ -974,19 +975,11 @@ void ExpressionAnalyzer::collectUsedColumns() RequiredSourceColumnsVisitor::Data columns_context; RequiredSourceColumnsVisitor(columns_context).visit(query); - NameSet required = columns_context.requiredColumns(); + NameSet source_column_names; + for (const auto & column : source_columns) + source_column_names.insert(column.name); -#if 0 - std::cerr << "Query: " << query << std::endl; - std::cerr << "CTX: " << columns_context << std::endl; - std::cerr << "source_columns: "; - for (const auto & name : source_columns) - std::cerr << "'" << name.name << "' "; - std::cerr << "required: "; - for (const auto & pr : required) - std::cerr << "'" << pr.first << "' "; - std::cerr << std::endl; -#endif + NameSet required = columns_context.requiredColumns(); if (columns_context.has_table_join) { @@ -1013,10 +1006,10 @@ void ExpressionAnalyzer::collectUsedColumns() } } + NameSet array_join_sources; if (columns_context.has_array_join) { /// Insert the columns required for the ARRAY JOIN calculation into the required columns list. - NameSet array_join_sources; for (const auto & result_source : syntax->array_join_result_to_source) array_join_sources.insert(result_source.second); @@ -1063,15 +1056,39 @@ void ExpressionAnalyzer::collectUsedColumns() if (!unknown_required_source_columns.empty()) { std::stringstream ss; - ss << "query: '" << query << "' "; - ss << columns_context; - ss << "source_columns: "; - for (const auto & name : source_columns) - ss << "'" << name.name << "' "; + ss << "Missing columns:"; + for (const auto & name : unknown_required_source_columns) + ss << " '" << name << "'"; + ss << " while processing query: '" << query << "'"; - throw Exception("Unknown identifier: " + *unknown_required_source_columns.begin() - + (select_query && !select_query->tables ? ". Note that there are no tables (FROM clause) in your query" : "") - + ", context: " + ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER); + ss << ", required columns:"; + for (const auto & name : columns_context.requiredColumns()) + ss << " '" << name << "'"; + + if (!source_column_names.empty()) + { + ss << ", source columns:"; + for (const auto & name : source_column_names) + ss << " '" << name << "'"; + } + else + ss << ", no source columns"; + + if (columns_context.has_table_join) + { + ss << ", joined columns:"; + for (const auto & column : analyzedJoin().available_joined_columns) + ss << " '" << column.name_and_type.name << "'"; + } + + if (!array_join_sources.empty()) + { + ss << ", arrayJoin columns:"; + for (const auto & name : array_join_sources) + ss << " '" << name << "'"; + } + + throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER); } } diff --git a/dbms/src/Interpreters/ExpressionJIT.cpp b/dbms/src/Interpreters/ExpressionJIT.cpp index 41a8e4e318b..8cb9f2003e1 100644 --- a/dbms/src/Interpreters/ExpressionJIT.cpp +++ b/dbms/src/Interpreters/ExpressionJIT.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 7faaac5f607..08d42331795 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -422,8 +422,8 @@ namespace typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); if (emplace_result.isInserted()) - time_series_map = new (time_series_map) typename Map::mapped_type(); - time_series_map->insert(join.getAsofType(), join.getAsofData(), asof_column, stored_block, i); + time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); + time_series_map->insert(join.getAsofType(), asof_column, stored_block, i); } }; @@ -511,10 +511,7 @@ void Join::prepareBlockListStructure(Block & stored_block) for (const auto & name : key_names_right) { if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back()) - { - LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name); break; // this is the last column so break is OK - } if (!erased.count(name)) stored_block.erase(stored_block.getPositionByName(name)); @@ -556,8 +553,6 @@ bool Join::insertFromBlock(const Block & block) prepareBlockListStructure(*stored_block); - LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure()); - size_t size = stored_block->columns(); /// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them. @@ -720,7 +715,7 @@ std::unique_ptr NO_INLINE joinRightIndexedColumns( if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) { - if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofData(), asof_column, i)) + if (const RowRef * found = mapped.findAsof(join.getAsofType(), asof_column, i)) { filter[i] = 1; mapped.setUsed(); @@ -1096,7 +1091,6 @@ void Join::joinGet(Block & block, const String & column_name) const void Join::joinBlock(Block & block, const Names & key_names_left, const NamesAndTypesList & columns_added_by_join) const { std::shared_lock lock(rwlock); - LOG_DEBUG(log, "joinBlock: " << block.dumpStructure()); checkTypesOfKeys(block, key_names_left, sample_block_with_keys); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 85255aaaaa0..7a223f46b35 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -132,8 +132,6 @@ public: ASTTableJoin::Kind getKind() const { return kind; } AsofRowRefs::Type getAsofType() const { return *asof_type; } - AsofRowRefs::LookupLists & getAsofData() { return asof_lookup_lists; } - const AsofRowRefs::LookupLists & getAsofData() const { return asof_lookup_lists; } /** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined). * Depending on template parameter, decide whether to overwrite existing values when encountering the same key again @@ -369,7 +367,6 @@ private: private: Type type = Type::EMPTY; std::optional asof_type; - AsofRowRefs::LookupLists asof_lookup_lists; static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes); diff --git a/dbms/src/Interpreters/RowRefs.cpp b/dbms/src/Interpreters/RowRefs.cpp index 9fea9819132..46e665ab423 100644 --- a/dbms/src/Interpreters/RowRefs.cpp +++ b/dbms/src/Interpreters/RowRefs.cpp @@ -30,51 +30,53 @@ void callWithType(AsofRowRefs::Type which, F && f) } // namespace -void AsofRowRefs::insert(Type type, LookupLists & lookup_data, const IColumn * asof_column, const Block * block, size_t row_num) +AsofRowRefs::AsofRowRefs(Type type) { auto call = [&](const auto & t) { - using T = std::decay_t; - using LookupType = typename Entry::LookupType; - - auto * column = typeid_cast *>(asof_column); - T key = column->getElement(row_num); - auto entry = Entry(key, RowRef(block, row_num)); - - std::lock_guard lock(lookup_data.mutex); - - if (!lookups) - { - lookup_data.lookups.push_back(Lookups()); - lookup_data.lookups.back() = LookupType(); - lookups = &lookup_data.lookups.back(); - } - std::get(*lookups).insert(entry); + using T = std::decay_t; + using LookupType = typename Entry::LookupType; + lookups = std::make_unique(); }; callWithType(type, call); } -const RowRef * AsofRowRefs::findAsof(Type type, const LookupLists & lookup_data, const IColumn * asof_column, size_t row_num) const +void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num) +{ + auto call = [&](const auto & t) + { + using T = std::decay_t; + using LookupPtr = typename Entry::LookupPtr; + + auto * column = typeid_cast *>(asof_column); + T key = column->getElement(row_num); + auto entry = Entry(key, RowRef(block, row_num)); + std::get(lookups)->insert(entry); + }; + + callWithType(type, call); +} + +const RowRef * AsofRowRefs::findAsof(Type type, const IColumn * asof_column, size_t row_num) const { const RowRef * out = nullptr; auto call = [&](const auto & t) { using T = std::decay_t; - using LookupType = typename Entry::LookupType; + using LookupPtr = typename Entry::LookupPtr; auto * column = typeid_cast *>(asof_column); T key = column->getElement(row_num); + auto & typed_lookup = std::get(lookups); - std::lock_guard lock(lookup_data.mutex); + // The first thread that calls upper_bound ensures that the data is sorted + auto it = typed_lookup->upper_bound(Entry(key)); - if (!lookups) - return; - - auto & typed_lookup = std::get(*lookups); - auto it = typed_lookup.upper_bound(Entry(key)); - if (it != typed_lookup.cbegin()) + // cbegin() is safe to call now because the array is immutable after sorting + // hence the pointer to a entry can be returned + if (it != typed_lookup->cbegin()) out = &((--it)->row_ref); }; diff --git a/dbms/src/Interpreters/RowRefs.h b/dbms/src/Interpreters/RowRefs.h index 227fba965b3..23c1b1e0eac 100644 --- a/dbms/src/Interpreters/RowRefs.h +++ b/dbms/src/Interpreters/RowRefs.h @@ -1,12 +1,12 @@ #pragma once #include -#include #include #include #include #include +#include namespace DB { @@ -32,14 +32,70 @@ struct RowRefList : RowRef RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} }; +/** + * This class is intended to push sortable data into. + * When looking up values the container ensures that it is sorted for log(N) lookup + * After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the + * references that can be returned by the lookup methods + */ + +template +class SortedLookupVector +{ +public: + using Base = std::vector; + + // First stage, insertions into the vector + template + void insert(U && x, TAllocatorParams &&... allocator_params) + { + assert(!sorted.load(std::memory_order_acquire)); + array.push_back(std::forward(x), std::forward(allocator_params)...); + } + + // Transition into second stage, ensures that the vector is sorted + typename Base::const_iterator upper_bound(const T & k) + { + sort(); + return std::upper_bound(array.cbegin(), array.cend(), k); + } + + // After ensuring that the vector is sorted by calling a lookup these are safe to call + typename Base::const_iterator cbegin() const { return array.cbegin(); } + typename Base::const_iterator cend() const { return array.cend(); } + +private: + std::atomic sorted = false; + Base array; + mutable std::mutex lock; + + // Double checked locking with SC atomics works in C++ + // https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/ + // The first thread that calls one of the lookup methods sorts the data + // After calling the first lookup method it is no longer allowed to insert any data + // the array becomes immutable + void sort() + { + if (!sorted.load(std::memory_order_acquire)) + { + std::lock_guard l(lock); + if (!sorted.load(std::memory_order_relaxed)) + { + std::sort(array.begin(), array.end()); + sorted.store(true, std::memory_order_release); + } + } + } +}; + class AsofRowRefs { public: template struct Entry { - using LookupType = SortedLookupPODArray>; - + using LookupType = SortedLookupVector>; + using LookupPtr = std::unique_ptr; T asof_value; RowRef row_ref; @@ -53,16 +109,10 @@ public: }; using Lookups = std::variant< - Entry::LookupType, - Entry::LookupType, - Entry::LookupType, - Entry::LookupType>; - - struct LookupLists - { - mutable std::mutex mutex; - std::list lookups; - }; + Entry::LookupPtr, + Entry::LookupPtr, + Entry::LookupPtr, + Entry::LookupPtr>; enum class Type { @@ -72,13 +122,23 @@ public: keyf64, }; + AsofRowRefs() {} + AsofRowRefs(Type t); + static std::optional getTypeSize(const IColumn * asof_column, size_t & type_size); - void insert(Type type, LookupLists &, const IColumn * asof_column, const Block * block, size_t row_num); - const RowRef * findAsof(Type type, const LookupLists &, const IColumn * asof_column, size_t row_num) const; + // This will be synchronized by the rwlock mutex in Join.h + void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num); + + // This will internally synchronize + const RowRef * findAsof(Type type, const IColumn * asof_column, size_t row_num) const; private: - Lookups * lookups = nullptr; + // Lookups can be stored in a HashTable because it is memmovable + // A std::variant contains a currently active type id (memmovable), together with a union of the types + // The types are all std::unique_ptr, which contains a single pointer, which is memmovable. + // Source: https://github.com/yandex/ClickHouse/issues/4906 + Lookups lookups; }; } diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index afbc7855c8f..7f47a76a068 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h index ac7d231d966..748ba19032b 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h @@ -16,7 +16,6 @@ #include - namespace DB { @@ -29,6 +28,8 @@ enum class BackgroundProcessingPoolTaskResult ERROR, NOTHING_TO_DO, }; + + /** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop. * In this case, one task can run simultaneously from different threads. * Designed for tasks that perform continuous background work (for example, merge). @@ -45,7 +46,6 @@ public: using TaskHandle = std::shared_ptr; - BackgroundProcessingPool(int size_); size_t getNumberOfThreads() const diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index b226d55978e..0717bdac58c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp index 89f5aaeafd5..4107663f11b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp @@ -1,4 +1,3 @@ -#include #include #include diff --git a/dbms/tests/integration/image/Dockerfile b/dbms/tests/integration/image/Dockerfile index d36f9ef0e7b..4db05f74b93 100644 --- a/dbms/tests/integration/image/Dockerfile +++ b/dbms/tests/integration/image/Dockerfile @@ -25,7 +25,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes - ENV TZ=Europe/Moscow RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python protobuf +RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2==2.7.5 pymongo tzlocal kafka-python protobuf redis ENV DOCKER_CHANNEL stable ENV DOCKER_VERSION 17.09.1-ce diff --git a/dbms/tests/performance/trim/trim_whitespace.xml b/dbms/tests/performance/trim/trim_whitespace.xml index 9ef5cf92611..cf4a5dac896 100644 --- a/dbms/tests/performance/trim/trim_whitespace.xml +++ b/dbms/tests/performance/trim/trim_whitespace.xml @@ -3,7 +3,7 @@ loop CREATE TABLE IF NOT EXISTS whitespaces(value String) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY tuple() - INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678) + INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678) diff --git a/dbms/tests/queries/0_stateless/00818_alias_bug_4110.reference b/dbms/tests/queries/0_stateless/00818_alias_bug_4110.reference index 5186cb8eeff..e6013d269c2 100644 --- a/dbms/tests/queries/0_stateless/00818_alias_bug_4110.reference +++ b/dbms/tests/queries/0_stateless/00818_alias_bug_4110.reference @@ -3,6 +3,12 @@ 11 11 11 12 12 11 +10 10 +10 11 11 +12 11 +10 12 +11 12 +11 12 0 1 123 456 diff --git a/dbms/tests/queries/0_stateless/00818_alias_bug_4110.sql b/dbms/tests/queries/0_stateless/00818_alias_bug_4110.sql index 7480f137a65..6cab0f1995c 100644 --- a/dbms/tests/queries/0_stateless/00818_alias_bug_4110.sql +++ b/dbms/tests/queries/0_stateless/00818_alias_bug_4110.sql @@ -4,6 +4,17 @@ select s.a + 1 as a, s.a + 1 as b from (select 10 as a) s; select s.a + 1 as b, s.a + 2 as a from (select 10 as a) s; select s.a + 2 as b, s.a + 1 as a from (select 10 as a) s; +select a, a as a from (select 10 as a); +select s.a, a, a + 1 as a from (select 10 as a) as s; +select s.a + 2 as b, b - 1 as a from (select 10 as a) s; +select s.a as a, s.a + 2 as b from (select 10 as a) s; +select s.a + 1 as a, s.a + 2 as b from (select 10 as a) s; +select a + 1 as a, a + 1 as b from (select 10 as a); +select a + 1 as b, b + 1 as a from (select 10 as a); -- { serverError 174 } +select 10 as a, a + 1 as a; -- { serverError 179 } +with 10 as a select a as a; -- { serverError 179 } +with 10 as a select a + 1 as a; -- { serverError 179 } + SELECT 0 as t FROM (SELECT 1 as t) as inn WHERE inn.t = 1; SELECT sum(value) as value FROM (SELECT 1 as value) as data WHERE data.value > 0; diff --git a/dbms/tests/queries/0_stateless/00917_least_sqr.reference b/dbms/tests/queries/0_stateless/00917_least_sqr.reference new file mode 100644 index 00000000000..8abd62892db --- /dev/null +++ b/dbms/tests/queries/0_stateless/00917_least_sqr.reference @@ -0,0 +1,8 @@ +(10,90) +(10.3,89.5) +(10,-90) +(1,1) +(nan,nan) +(0,3) +(nan,nan) +(nan,nan) diff --git a/dbms/tests/queries/0_stateless/00917_least_sqr.sql b/dbms/tests/queries/0_stateless/00917_least_sqr.sql new file mode 100644 index 00000000000..729d140ca30 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00917_least_sqr.sql @@ -0,0 +1,9 @@ +select arrayReduce('leastSqr', [1, 2, 3, 4], [100, 110, 120, 130]); +select arrayReduce('leastSqr', [1, 2, 3, 4], [100, 110, 120, 131]); +select arrayReduce('leastSqr', [-1, -2, -3, -4], [-100, -110, -120, -130]); +select arrayReduce('leastSqr', [5, 5.1], [6, 6.1]); +select arrayReduce('leastSqr', [0], [0]); +select arrayReduce('leastSqr', [3, 4], [3, 3]); +select arrayReduce('leastSqr', [3, 3], [3, 4]); +select arrayReduce('leastSqr', emptyArrayUInt8(), emptyArrayUInt8()); + diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_big.reference b/dbms/tests/queries/0_stateless/00927_asof_join_big.reference new file mode 100644 index 00000000000..0d0ce1ea4e8 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_big.reference @@ -0,0 +1 @@ +75000000 diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_big.sql b/dbms/tests/queries/0_stateless/00927_asof_join_big.sql new file mode 100644 index 00000000000..5e49460089d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_big.sql @@ -0,0 +1,31 @@ +USE test; + +DROP TABLE IF EXISTS tvs; +DROP TABLE IF EXISTS trades; +DROP TABLE IF EXISTS keys; +DROP TABLE IF EXISTS tv_times; +DROP TABLE IF EXISTS trade_times; + +CREATE TABLE keys(k UInt32) ENGINE = MergeTree() ORDER BY k; +INSERT INTO keys(k) SELECT number FROM system.numbers LIMIT 5000; + +CREATE TABLE tv_times(t UInt32) ENGINE = MergeTree() ORDER BY t; +INSERT INTO tv_times(t) SELECT number * 3 FROM system.numbers LIMIT 50000; + +CREATE TABLE trade_times(t UInt32) ENGINE = MergeTree() ORDER BY t; +INSERT INTO trade_times(t) SELECT number * 10 FROM system.numbers LIMIT 15000; + +CREATE TABLE tvs(k UInt32, t UInt32, tv UInt64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO tvs(k,t,tv) SELECT k, t, t FROM keys CROSS JOIN tv_times; + +CREATE TABLE trades(k UInt32, t UInt32, price UInt64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO trades(k,t,price) SELECT k, t, t FROM keys CROSS JOIN trade_times; + +SELECT SUM(trades.price - tvs.tv) FROM trades ASOF LEFT JOIN tvs USING(k,t); + + +DROP TABLE tvs; +DROP TABLE trades; +DROP TABLE keys; +DROP TABLE tv_times; +DROP TABLE trade_times; diff --git a/dbms/tests/queries/0_stateless/00931_low_cardinality_nullable_aggregate_function_type.reference b/dbms/tests/queries/0_stateless/00931_low_cardinality_nullable_aggregate_function_type.reference new file mode 100644 index 00000000000..866b45bd9ea --- /dev/null +++ b/dbms/tests/queries/0_stateless/00931_low_cardinality_nullable_aggregate_function_type.reference @@ -0,0 +1 @@ +2019-01-01 \N diff --git a/dbms/tests/queries/0_stateless/00931_low_cardinality_nullable_aggregate_function_type.sql b/dbms/tests/queries/0_stateless/00931_low_cardinality_nullable_aggregate_function_type.sql new file mode 100644 index 00000000000..04089aef377 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00931_low_cardinality_nullable_aggregate_function_type.sql @@ -0,0 +1,8 @@ +drop table if exists test.lc; + +CREATE TABLE test.lc (`date` Date, `name` LowCardinality(Nullable(String)), `clicks` Nullable(Int32)) ENGINE = MergeTree() ORDER BY date SETTINGS index_granularity = 8192; +INSERT INTO test.lc SELECT '2019-01-01', null, 0 FROM numbers(1000000); +SELECT date, argMax(name, clicks) FROM test.lc GROUP BY date; + +drop table if exists test.lc; + diff --git a/dbms/tests/queries/0_stateless/00933_reserved_word.reference b/dbms/tests/queries/0_stateless/00933_reserved_word.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00933_reserved_word.sql b/dbms/tests/queries/0_stateless/00933_reserved_word.sql new file mode 100644 index 00000000000..8e463fbffb9 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00933_reserved_word.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS test.reserved_word_table; +CREATE TABLE test.reserved_word_table (`index` UInt8) ENGINE = MergeTree ORDER BY `index`; + +DETACH TABLE test.reserved_word_table; +ATTACH TABLE test.reserved_word_table; + +DROP TABLE test.reserved_word_table; diff --git a/dbms/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.reference b/dbms/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.reference new file mode 100644 index 00000000000..797f208c02b --- /dev/null +++ b/dbms/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.reference @@ -0,0 +1 @@ +0 36 14 diff --git a/dbms/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.sh b/dbms/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.sh new file mode 100755 index 00000000000..b34d5072d3e --- /dev/null +++ b/dbms/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.small_table" + +$CLICKHOUSE_CLIENT --query="CREATE TABLE test.small_table (a UInt64 default 0, n UInt64) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY (a);" + +$CLICKHOUSE_CLIENT --query="INSERT INTO test.small_table(n) SELECT * from system.numbers limit 100000;" + +cached_query="SELECT count() FROM test.small_table where n > 0;" + +$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query="$cached_query" &> /dev/null + +$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query_id="test-query-uncompressed-cache" --query="$cached_query" &> /dev/null + +sleep 1 +$CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS" + +$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'Seek')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'ReadCompressedBytes')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UncompressedCacheHits')] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') AND (type = 2) ORDER BY event_time DESC LIMIT 1" + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.small_table" + diff --git a/dbms/tests/queries/0_stateless/00934_is_valid_utf8.reference b/dbms/tests/queries/0_stateless/00934_is_valid_utf8.reference new file mode 100644 index 00000000000..1964677cc54 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00934_is_valid_utf8.reference @@ -0,0 +1,1200 @@ +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 diff --git a/dbms/tests/queries/0_stateless/00934_is_valid_utf8.sql b/dbms/tests/queries/0_stateless/00934_is_valid_utf8.sql new file mode 100644 index 00000000000..e4075656e81 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00934_is_valid_utf8.sql @@ -0,0 +1,127 @@ +select 1 = isValidUTF8('') from system.numbers limit 10; +select 1 = isValidUTF8('some text') from system.numbers limit 10; +select 1 = isValidUTF8('какой-то текст') from system.numbers limit 10; +select 1 = isValidUTF8('\x00') from system.numbers limit 10; +select 1 = isValidUTF8('\x66') from system.numbers limit 10; +select 1 = isValidUTF8('\x7F') from system.numbers limit 10; +select 1 = isValidUTF8('\x00\x7F') from system.numbers limit 10; +select 1 = isValidUTF8('\x7F\x00') from system.numbers limit 10; +select 1 = isValidUTF8('\xC2\x80') from system.numbers limit 10; +select 1 = isValidUTF8('\xDF\xBF') from system.numbers limit 10; +select 1 = isValidUTF8('\xE0\xA0\x80') from system.numbers limit 10; +select 1 = isValidUTF8('\xE0\xA0\xBF') from system.numbers limit 10; +select 1 = isValidUTF8('\xED\x9F\x80') from system.numbers limit 10; +select 1 = isValidUTF8('\xEF\x80\xBF') from system.numbers limit 10; +select 1 = isValidUTF8('\xF0\x90\xBF\x80') from system.numbers limit 10; +select 1 = isValidUTF8('\xF2\x81\xBE\x99') from system.numbers limit 10; +select 1 = isValidUTF8('\xF4\x8F\x88\xAA') from system.numbers limit 10; + +select 1 = isValidUTF8('a') from system.numbers limit 10; +select 1 = isValidUTF8('\xc3\xb1') from system.numbers limit 10; +select 1 = isValidUTF8('\xe2\x82\xa1') from system.numbers limit 10; +select 1 = isValidUTF8('\xf0\x90\x8c\xbc') from system.numbers limit 10; +select 1 = isValidUTF8('안녕하세요, 세상') from system.numbers limit 10; + +select 0 = isValidUTF8('\xc3\x28') from system.numbers limit 10; +select 0 = isValidUTF8('\xa0\xa1') from system.numbers limit 10; +select 0 = isValidUTF8('\xe2\x28\xa1') from system.numbers limit 10; +select 0 = isValidUTF8('\xe2\x82\x28') from system.numbers limit 10; +select 0 = isValidUTF8('\xf0\x28\x8c\xbc') from system.numbers limit 10; +select 0 = isValidUTF8('\xf0\x90\x28\xbc') from system.numbers limit 10; +select 0 = isValidUTF8('\xf0\x28\x8c\x28') from system.numbers limit 10; +select 0 = isValidUTF8('\xc0\x9f') from system.numbers limit 10; +select 0 = isValidUTF8('\xf5\xff\xff\xff') from system.numbers limit 10; +select 0 = isValidUTF8('\xed\xa0\x81') from system.numbers limit 10; +select 0 = isValidUTF8('\xf8\x90\x80\x80\x80') from system.numbers limit 10; +select 0 = isValidUTF8('12345678901234\xed') from system.numbers limit 10; +select 0 = isValidUTF8('123456789012345\xed') from system.numbers limit 10; +select 0 = isValidUTF8('123456789012345\xed123456789012345\xed') from system.numbers limit 10; +select 0 = isValidUTF8('123456789012345\xf1') from system.numbers limit 10; +select 0 = isValidUTF8('123456789012345\xc2') from system.numbers limit 10; +select 0 = isValidUTF8('\xC2\x7F') from system.numbers limit 10; + +select 0 = isValidUTF8('\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\xBF') from system.numbers limit 10; +select 0 = isValidUTF8('\xC0\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\xC1\x00') from system.numbers limit 10; +select 0 = isValidUTF8('\xC2\x7F') from system.numbers limit 10; +select 0 = isValidUTF8('\xDF\xC0') from system.numbers limit 10; +select 0 = isValidUTF8('\xE0\x9F\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\xE0\xC2\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\xED\xA0\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\xED\x7F\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\xEF\x80\x00') from system.numbers limit 10; +select 0 = isValidUTF8('\xF0\x8F\x80\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\xF0\xEE\x80\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\xF2\x90\x91\x7F') from system.numbers limit 10; +select 0 = isValidUTF8('\xF4\x90\x88\xAA') from system.numbers limit 10; +select 0 = isValidUTF8('\xF4\x00\xBF\xBF') from system.numbers limit 10; +select 0 = isValidUTF8('\x00\x00\x00\x00\x00\xC2\x80\x00\x00\x00\xE1\x80\x80\x00\x00\xC2\xC2\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') from system.numbers limit 10; +select 0 = isValidUTF8('\x00\x00\x00\x00\x00\xC2\xC2\x80\x00\x00\xE1\x80\x80\x00\x00\x00') from system.numbers limit 10; +select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1') from system.numbers limit 10; +select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80\xC2\x80') from system.numbers limit 10; +select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF0\x80\x80\x80') from system.numbers limit 10; + +select 1 = isValidUTF8(toFixedString('some text', 9)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('какой-то текст', 27)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\x00', 1)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\x66', 1)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\x7F', 1)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\x00\x7F', 2)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\x7F\x00', 2)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xC2\x80', 2)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xDF\xBF', 2)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xE0\xA0\x80', 3)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xE0\xA0\xBF', 3)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xED\x9F\x80', 3)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xEF\x80\xBF', 3)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xF0\x90\xBF\x80', 4)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xF2\x81\xBE\x99', 4)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xF4\x8F\x88\xAA', 4)) from system.numbers limit 10; + +select 0 = isValidUTF8(toFixedString('\x80', 1)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xBF', 1)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xC0\x80', 2)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xC1\x00', 2)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xC2\x7F', 2)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xDF\xC0', 2)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xE0\x9F\x80', 3)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xE0\xC2\x80', 3)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xED\xA0\x80', 3)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xED\x7F\x80', 3)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xEF\x80\x00', 3)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xF0\x8F\x80\x80', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xF0\xEE\x80\x80', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xF2\x90\x91\x7F', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xF4\x90\x88\xAA', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xF4\x00\xBF\xBF', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\xC2\x80\x00\x00\x00\xE1\x80\x80\x00\x00\xC2\xC2\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 32)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\xC2\xC2\x80\x00\x00\xE1\x80\x80\x00\x00\x00', 16)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80', 32)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1', 32)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80\x80', 33)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80\xC2\x80', 34)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF0\x80\x80\x80', 35)) from system.numbers limit 10; + +select 1 = isValidUTF8(toFixedString('a', 1)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xc3\xb1', 2)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xe2\x82\xa1', 3)) from system.numbers limit 10; +select 1 = isValidUTF8(toFixedString('\xf0\x90\x8c\xbc', 4)) from system.numbers limit 10; + +select 0 = isValidUTF8(toFixedString('\xc3\x28', 2)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xa0\xa1', 2)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xe2\x28\xa1', 3)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xe2\x82\x28', 3)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xf0\x28\x8c\xbc', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xf0\x90\x28\xbc', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xf0\x28\x8c\x28', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xc0\x9f', 2)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xf5\xff\xff\xff', 4)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xed\xa0\x81', 3)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xf8\x90\x80\x80\x80', 5)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('123456789012345\xed', 16)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('123456789012345\xf1', 16)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('123456789012345\xc2', 16)) from system.numbers limit 10; +select 0 = isValidUTF8(toFixedString('\xC2\x7F', 2)) from system.numbers limit 10; diff --git a/docs/en/operations/utils/clickhouse-copier.md b/docs/en/operations/utils/clickhouse-copier.md index fac374b4790..57358d49f90 100644 --- a/docs/en/operations/utils/clickhouse-copier.md +++ b/docs/en/operations/utils/clickhouse-copier.md @@ -32,6 +32,8 @@ Parameters: - `daemon` — Starts `clickhouse-copier` in daemon mode. - `config` — The path to the `zookeeper.xml` file with the parameters for the connection to ZooKeeper. - `task-path` — The path to the ZooKeeper node. This node is used for syncing `clickhouse-copier` processes and storing tasks. Tasks are stored in `$task-path/description`. +- `task-file` — Optional path to file with task configuration for initial upload to ZooKeeper. +- `task-upload-force` — Force upload `task-file` even if node already exists. - `base-dir` — The path to logs and auxiliary files. When it starts, `clickhouse-copier` creates `clickhouse-copier_YYYYMMHHSS_` subdirectories in `$base-dir`. If this parameter is omitted, the directories are created in the directory where `clickhouse-copier` was launched. ## Format of zookeeper.xml diff --git a/docs/en/query_language/functions/string_functions.md b/docs/en/query_language/functions/string_functions.md index 8d4dd838282..448a73789aa 100644 --- a/docs/en/query_language/functions/string_functions.md +++ b/docs/en/query_language/functions/string_functions.md @@ -56,6 +56,10 @@ It doesn't detect the language. So for Turkish the result might not be exactly c If the length of the UTF-8 byte sequence is different for upper and lower case of a code point, the result may be incorrect for this code point. If the string contains a set of bytes that is not UTF-8, then the behavior is undefined. +## isValidUTF8 + +Returns 1, if the set of bytes is valid UTF-8 encoded, otherwise 0. + ## reverse Reverses the string (as a sequence of bytes). diff --git a/docs/en/query_language/select.md b/docs/en/query_language/select.md index 7c4aa82e379..458fa732f81 100644 --- a/docs/en/query_language/select.md +++ b/docs/en/query_language/select.md @@ -46,22 +46,38 @@ The FINAL modifier can be used only for a SELECT from a CollapsingMergeTree tabl ### SAMPLE Clause {#select-sample-clause} -The `SAMPLE` clause allows for approximated query processing. Approximated query processing is only supported by the tables in the `MergeTree` family, and only if the sampling expression was specified during table creation (see [MergeTree engine](../operations/table_engines/mergetree.md)). +The `SAMPLE` clause allows for approximated query processing. + +When data sampling is enabled, the query is not performed on all the data, but only on a certain fraction of data (sample). For example, if you need to calculate statistics for all the visits, it is enough to execute the query on the 1/10 fraction of all the visits and then multiply the result by 10. + +Approximated query processing can be useful in the following cases: + +- When you have strict timing requirements (like <100ms) but you can't justify the cost of additional hardware resources to meet them. +- When your raw data is not accurate, so approximation doesn't noticeably degrade the quality. +- Business requirements target approximate results (for cost-effectiveness, or in order to market exact results to premium users). + +!!! note + You can only use sampling with the tables in the [MergeTree](../operations/table_engines/mergetree.md) family, and only if the sampling expression was specified during table creation (see [MergeTree engine](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table)). The features of data sampling are listed below: - Data sampling is a deterministic mechanism. The result of the same `SELECT .. SAMPLE` query is always the same. -- Sampling works consistently for different tables. For tables with a single sampling key, a sample with the same coefficient always selects the same subset of possible data. For example, a sample of user IDs takes rows with the same subset of all the possible user IDs from different tables. This means that you can use the sample in subqueries in the `IN` clause, as well as manually correlate results of different queries with samples. +- Sampling works consistently for different tables. For tables with a single sampling key, a sample with the same coefficient always selects the same subset of possible data. For example, a sample of user IDs takes rows with the same subset of all the possible user IDs from different tables. This means that you can use the sample in subqueries in the [IN](#select-in-operators) clause. Also, you can join samples using the [JOIN](#select-join) clause. - Sampling allows reading less data from a disk. Note that you must specify the sampling key correctly. For more information, see [Creating a MergeTree Table](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table). For the `SAMPLE` clause the following syntax is supported: -- `SAMPLE k`, where `k` is a decimal number from 0 to 1. The query is executed on `k` fraction of data. For example, `SAMPLE 0.1` runs the query on 10% of data. [Read more](#select-sample-k) -- `SAMPLE n`, where `n` is a sufficiently large integer. The query is executed on a sample of at least `n` rows (but not significantly more than this). For example, `SAMPLE 10000000` runs the query on a minimum of 10,000,000 rows. [Read more](#select-sample-n) -- `SAMPLE k OFFSET m` where `k` and `m` are numbers from 0 to 1. The query is executed on a sample of `k` percent of the data. The data used for the sample is offset by `m` percent. [Read more](#select-sample-offset) +| SAMPLE Clause Syntax | Description | +| ---------------- | --------- | +| `SAMPLE k` | Here `k` is the number from 0 to 1.
The query is executed on `k` fraction of data. For example, `SAMPLE 0.1` runs the query on 10% of data. [Read more](#select-sample-k)| +| `SAMPLE n` | Here `n` is a sufficiently large integer.
The query is executed on a sample of at least `n` rows (but not significantly more than this). For example, `SAMPLE 10000000` runs the query on a minimum of 10,000,000 rows. [Read more](#select-sample-n) | +| `SAMPLE k OFFSET m` | Here `k` and `m` are the numbers from 0 to 1.
The query is executed on a sample of `k` fraction of the data. The data used for the sample is offset by `m` fraction. [Read more](#select-sample-offset) | + #### SAMPLE k {#select-sample-k} +Here `k` is the number from 0 to 1 (both fractional and decimal notations are supported). For example, `SAMPLE 1/2` or `SAMPLE 0.5`. + In a `SAMPLE k` clause, the sample is taken from the `k` fraction of data. The example is shown below: ``` sql @@ -76,25 +92,29 @@ GROUP BY Title ORDER BY PageViews DESC LIMIT 1000 ``` -In this example, the query is executed on a sample from 0.1 (10%) of data. Values of aggregate functions are not corrected automatically, so to get an approximate result, the value 'count()' is manually multiplied by 10. +In this example, the query is executed on a sample from 0.1 (10%) of data. Values of aggregate functions are not corrected automatically, so to get an approximate result, the value `count()` is manually multiplied by 10. #### SAMPLE n {#select-sample-n} -In this case, the query is executed on a sample of at least `n` rows, where `n` is a sufficiently large integer. For example, `SAMPLE 10000000`. +Here `n` is a sufficiently large integer. For example, `SAMPLE 10000000`. + +In this case, the query is executed on a sample of at least `n` rows (but not significantly more than this). For example, `SAMPLE 10000000` runs the query on a minimum of 10,000,000 rows. Since the minimum unit for data reading is one granule (its size is set by the `index_granularity` setting), it makes sense to set a sample that is much larger than the size of the granule. -When using the `SAMPLE n` clause, the relative coefficient is calculated dynamically. Since you do not know which relative percent of data was processed, you do not know the coefficient the aggregate functions should be multiplied by (for example, you do not know if `SAMPLE 1000000` was taken from a set of 10,000,000 rows or from a set of 1,000,000,000 rows). In this case, use the `_sample_factor` virtual column to get the approximate result. +When using the `SAMPLE n` clause, you don't know which relative percent of data was processed. So you don't know the coefficient the aggregate functions should be multiplied by. Use the `_sample_factor` virtual column to get the approximate result. -The `_sample_factor` column is where ClickHouse stores relative coefficients. This column is created automatically when you create a table with the specified sampling key. The usage example is shown below: +The `_sample_factor` column contains relative coefficients that are calculated dynamically. This column is created automatically when you [create](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table) a table with the specified sampling key. The usage examples of the `_sample_factor` column are shown below. + +Let's consider the table `visits`, which contains the statistics about site visits. The first example shows how to calculate the number of page views: ``` sql -SELECT sum(Duration * _sample_factor) +SELECT sum(PageViews * _sample_factor) FROM visits SAMPLE 10000000 ``` -If you need to get the approximate count of rows in a `SELECT .. SAMPLE n` query, get the sum() of the `_sample_factor` column instead of counting the `count(*) * _sample_factor` value. For example: +The next example shows how to calculate the total number of visits: ``` sql SELECT sum(_sample_factor) @@ -102,7 +122,7 @@ FROM visits SAMPLE 10000000 ``` -Note that to calculate the average in a `SELECT .. SAMPLE n` query, you do not need to use the `_sample_factor` column: +The example below shows how to calculate the average session duration. Note that you don't need to use the relative coefficient to calculate the average values. ``` sql SELECT avg(Duration) @@ -112,9 +132,9 @@ SAMPLE 10000000 #### SAMPLE k OFFSET m {#select-sample-offset} -You can specify the `SAMPLE k OFFSET m` clause, where `k` and `m` are numbers from 0 to 1. Examples are shown below. +Here `k` and `m` are numbers from 0 to 1. Examples are shown below. -Example 1. +**Example 1** ``` sql SAMPLE 1/10 @@ -124,13 +144,13 @@ In this example, the sample is 1/10th of all data: `[++------------------]` -Example 2. +**Example 2** ``` sql SAMPLE 1/10 OFFSET 1/2 ``` -Here, a sample of 10% is taken from the second half of data. +Here, a sample of 10% is taken from the second half of the data. `[----------++--------]` diff --git a/docs/ru/operations/utils/clickhouse-copier.md b/docs/ru/operations/utils/clickhouse-copier.md index 0c852450457..b38e25f6c16 100644 --- a/docs/ru/operations/utils/clickhouse-copier.md +++ b/docs/ru/operations/utils/clickhouse-copier.md @@ -31,6 +31,8 @@ clickhouse-copier copier --daemon --config zookeeper.xml --task-path /task/path - `daemon` - запускает `clickhouse-copier` в режиме демона. - `config` - путь к файлу `zookeeper.xml` с параметрами соединения с ZooKeeper. - `task-path` - путь к ноде ZooKeeper. Нода используется для синхронизации между процессами `clickhouse-copier` и для хранения заданий. Задания хранятся в `$task-path/description`. +- `task-file` - необязательный путь к файлу с описанием конфигурация заданий для загрузки в ZooKeeper. +- `task-upload-force` - Загрузить `task-file` в ZooKeeper даже если уже было загружено. - `base-dir` - путь к логам и вспомогательным файлам. При запуске `clickhouse-copier` создает в `$base-dir` подкаталоги `clickhouse-copier_YYYYMMHHSS_`. Если параметр не указан, то каталоги будут создаваться в каталоге, где `clickhouse-copier` был запущен. ## Формат zookeeper.xml diff --git a/docs/ru/query_language/functions/string_functions.md b/docs/ru/query_language/functions/string_functions.md index 653342ebbd4..c180909a968 100644 --- a/docs/ru/query_language/functions/string_functions.md +++ b/docs/ru/query_language/functions/string_functions.md @@ -38,6 +38,9 @@ Если длина UTF-8 последовательности байт различна для верхнего и нижнего регистра кодовой точки, то для этой кодовой точки, результат работы может быть некорректным. Если строка содержит набор байт, не являющийся UTF-8, то поведение не определено. +## isValidUTF8 +Возвращает 1, если набор байт является корректным в кодировке UTF-8, 0 иначе. + ## reverse Разворачивает строку (как последовательность байт). diff --git a/docs/ru/query_language/select.md b/docs/ru/query_language/select.md index 153e20bd8df..427b9d18286 100644 --- a/docs/ru/query_language/select.md +++ b/docs/ru/query_language/select.md @@ -45,19 +45,42 @@ SELECT [DISTINCT] expr_list Модификатор FINAL может быть использован только при SELECT-е из таблицы типа CollapsingMergeTree. При указании FINAL, данные будут выбираться полностью "сколлапсированными". Стоит учитывать, что использование FINAL приводит к выбору кроме указанных в SELECT-е столбцов также столбцов, относящихся к первичному ключу. Также, запрос будет выполняться в один поток, и при выполнении запроса будет выполняться слияние данных. Это приводит к тому, что при использовании FINAL, запрос выполняется медленнее. В большинстве случаев, следует избегать использования FINAL. Подробнее смотрите раздел "Движок CollapsingMergeTree". +### Секция SAMPLE {#select-sample-clause} -### Секция SAMPLE +Секция `SAMPLE` позволяет выполнять запросы приближённо. Например, чтобы посчитать статистику по всем визитам, можно обработать 1/10 всех визитов и результат домножить на 10. -Секция SAMPLE позволяет выполнить запрос приближённо. Приближённое выполнение запроса поддерживается только таблицами типа MergeTree\* и только если при создании таблицы было указано выражение, по которому производится выборка (смотрите раздел "Движок MergeTree"). +Сэмплирование имеет смысл, когда: -`SAMPLE` имеет вид `SAMPLE k`, где `k` - дробное число в интервале от 0 до 1, или `SAMPLE n`, где n - достаточно большое целое число. +1. Точность результата не важна, например, для оценочных расчетов. +2. Возможности аппаратной части не позволяют соответствовать строгим критериям. Например, время ответа должно быть <100 мс. При этом точность расчета имеет более низкий приоритет. +3. Точность результата участвует в бизнес-модели сервиса. Например, пользователи с бесплатной подпиской на сервис могут получать отчеты с меньшей точностью, чем пользователи с премиум подпиской. -В первом случае, запрос будет выполнен по k-доле данных. Например, если указано `SAMPLE 0.1`, то запрос будет выполнен по 10% данных. -Во втором случае, запрос будет выполнен по выборке из не более n строк. Например, если указано `SAMPLE 10000000`, то запрос будет выполнен по не более чем 10 000 000 строкам. +!!! note "Внимание" + Не стоит использовать сэмплирование в тех задачах, где важна точность расчетов. Например, при работе с финансовыми отчетами. -Пример: +Свойства сэмплирования: -``` sql +- Сэмплирование работает детерминированно. При многократном выполнении одного и того же запроса `SELECT .. SAMPLE`, результат всегда будет одинаковым. +- Сэмплирование поддерживает консистентность для разных таблиц. Имеется в виду, что для таблиц с одним и тем же ключом сэмплирования, подмножество данных в выборках будет одинаковым (выборки при этом должны быть сформированы для одинаковой доли данных). Например, выборка по идентификаторам посетителей выберет из разных таблиц строки с одинаковым подмножеством всех возможных идентификаторов. Это свойство позволяет использовать выборки в подзапросах в секции [IN](#select-in-operators), а также объединять выборки с помощью [JOIN](#select-join). +- Сэмплирование позволяет читать меньше данных с диска. Обратите внимание, для этого необходимо корректно указать ключ сэмплирования. Подробнее см. в разделе [Создание таблицы MergeTree](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table). + +Сэмплирование поддерживается только таблицами семейства [MergeTree](../operations/table_engines/mergetree.md) и только в том случае, если для таблиц был указан ключ сэмплирования (выражение, на основе которого должна производиться выборка). Подробнее см. в разделе [Создание таблиц MergeTree](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table). + +Выражение `SAMPLE` в запросе можно задать следующими способами: + +| Способ задания SAMPLE| Описание | +| ---------------- | --------- | +| `SAMPLE k` | Здесь `k` – это дробное число в интервале от 0 до 1.
Запрос будет выполнен по `k` доле данных. Например, если указано `SAMPLE 1/10`, то запрос будет выполнен для выборки из 1/10 данных. [Подробнее](#select-sample-k)| +| `SAMPLE n` | Здесь `n` – это достаточно большое целое число.
Запрос будет выполнен для выборки, состоящей из не менее чем `n` строк. Например, если указано `SAMPLE 10000000`, то запрос будет выполнен для не менее чем 10,000,000 строк. [Подробнее](#select-sample-n) | +| `SAMPLE k OFFSET m` | Здесь `k` и `m` – числа от 0 до 1.
Запрос будет выполнен по `k` доле данных. При этом выборка будет сформирована со смещением на `m` долю. [Подробнее](#select-sample-offset) | + +#### SAMPLE k {#select-sample-k} + +Здесь `k` – число в интервале от 0 до 1. Поддерживается как дробная, так и десятичная форма записи. Например, `SAMPLE 1/2` или `SAMPLE 0.5`. + +Если задано выражение `SAMPLE k`, запрос будет выполнен для `k` доли данных. Рассмотрим пример: + +```sql SELECT Title, count() * 10 AS PageViews @@ -65,22 +88,76 @@ FROM hits_distributed SAMPLE 0.1 WHERE CounterID = 34 - AND toDate(EventDate) >= toDate('2013-01-29') - AND toDate(EventDate) <= toDate('2013-02-04') - AND NOT DontCountHits - AND NOT Refresh - AND Title != '' GROUP BY Title ORDER BY PageViews DESC LIMIT 1000 ``` -В этом примере, запрос выполняется по выборке из 0.1 (10%) данных. Значения агрегатных функций не корректируются автоматически, поэтому для получения приближённого результата, значение count() вручную домножается на 10. +В этом примере запрос выполняется по выборке из 0.1 (10%) данных. Значения агрегатных функций не корректируются автоматически, поэтому чтобы получить приближённый результат, значение `count()` нужно вручную умножить на 10. -При использовании варианта вида `SAMPLE 10000000`, нет информации, какая относительная доля данных была обработана, и на что следует домножить агрегатные функции, поэтому такой способ записи подходит не для всех случаев. +Выборка с указанием относительного коэффициента является "согласованной": для таблиц с одним и тем же ключом сэмплирования, выборка с одинаковой относительной долей всегда будет составлять одно и то же подмножество данных. То есть выборка из разных таблиц, на разных серверах, в разное время, формируется одинаковым образом. -Выборка с указанием относительного коэффициента является "согласованной": если рассмотреть все возможные данные, которые могли бы быть в таблице, то выборка (при использовании одного выражения сэмплирования, указанного при создании таблицы), с одинаковым коэффициентом, выбирает всегда одно и то же подмножество этих всевозможных данных. То есть, выборка из разных таблиц, на разных серверах, в разное время, делается одинаковым образом. +#### SAMPLE n {#select-sample-n} -Например, выборка по идентификаторам посетителей, выберет из разных таблиц строки с одинаковым подмножеством всех возможных идентификаторов посетителей. Это позволяет использовать выборку в подзапросах в секции IN, а также при ручном сопоставлении результатов разных запросов с выборками. +Здесь `n` – это достаточно большое целое число. Например, `SAMPLE 10000000`. + +Если задано выражение `SAMPLE n`, запрос будет выполнен для выборки из не менее `n` строк (но не значительно больше этого значения). Например, если задать `SAMPLE 10000000`, в выборку попадут не менее 10,000,000 строк. + +!!! note "Примечание" + Следует иметь в виду, что `n` должно быть достаточно большим числом. Так как минимальной единицей данных для чтения является одна гранула (её размер задаётся настройкой `index_granularity` для таблицы), имеет смысл создавать выборки, размер которых существенно превосходит размер гранулы. + +При выполнении `SAMPLE n` коэффициент сэмплирования заранее неизвестен (то есть нет информации о том, относительно какого количества данных будет сформирована выборка). Чтобы узнать коэффициент сэмплирования, используйте столбец `_sample_factor`. + +Виртуальный столбец `_sample_factor` автоматически создается в тех таблицах, для которых задано выражение `SAMPLE BY` (подробнее см. в разделе [Создание таблицы MergeTree](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table)). В столбце содержится коэффициент сэмплирования для таблицы – он рассчитывается динамически по мере добавления данных в таблицу. Ниже приведены примеры использования столбца `_sample_factor`. + +Предположим, у нас есть таблица, в которой ведется статистика посещений сайта. Пример ниже показывает, как рассчитать суммарное число просмотров: + +```sql +SELECT sum(PageViews * _sample_factor) +FROM visits +SAMPLE 10000000 +``` + +Следующий пример показывает, как посчитать общее число визитов: + +```sql +SELECT sum(_sample_factor) +FROM visits +SAMPLE 10000000 +``` + +В примере ниже рассчитывается среднее время на сайте. Обратите внимание, при расчете средних значений, умножать результат на коэффициент сэмплирования не нужно. + +```sql +SELECT avg(Duration) +FROM visits +SAMPLE 10000000 +``` + +#### SAMPLE k OFFSET m {#select-sample-offset} + +Здесь `k` и `m` – числа в интервале от 0 до 1. Например, `SAMPLE 0.1 OFFSET 0.5`. Поддерживается как дробная, так и десятичная форма записи. + +При задании `SAMPLE k OFFSET m`, выборка будет сформирована из `k` доли данных со смещением на долю `m`. Примеры приведены ниже. + +**Пример 1** + +```sql +SAMPLE 1/10 +``` + +В этом примере выборка будет сформирована по 1/10 доле всех данных: + +`[++------------------]` + +**Пример 2** + +```sql +SAMPLE 1/10 OFFSET 1/2 +``` + +Здесь выборка, которая состоит из 1/10 доли данных, взята из второй половины данных. + +`[----------++--------]` ### Секция ARRAY JOIN {#select-array-join-clause} diff --git a/utils/iotest/iotest.cpp b/utils/iotest/iotest.cpp index 499eb9b464f..fac48aae00d 100644 --- a/utils/iotest/iotest.cpp +++ b/utils/iotest/iotest.cpp @@ -44,7 +44,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block { using namespace DB; - Memory direct_buf(block_size, sysconf(_SC_PAGESIZE)); + Memory<> direct_buf(block_size, sysconf(_SC_PAGESIZE)); std::vector simple_buf(block_size); char * buf; diff --git a/utils/iotest/iotest_aio.cpp b/utils/iotest/iotest_aio.cpp index 77846e1ca80..8fb7459fd3b 100644 --- a/utils/iotest/iotest_aio.cpp +++ b/utils/iotest/iotest_aio.cpp @@ -51,9 +51,9 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block AIOContext ctx; - std::vector buffers(buffers_count); + std::vector> buffers(buffers_count); for (size_t i = 0; i < buffers_count; ++i) - buffers[i] = Memory(block_size, sysconf(_SC_PAGESIZE)); + buffers[i] = Memory<>(block_size, sysconf(_SC_PAGESIZE)); drand48_data rand_data; timespec times;