Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Ivan Blinkov 2019-04-08 12:14:02 +03:00
commit e6273af4b7
61 changed files with 2681 additions and 532 deletions

View File

@ -1,7 +1,6 @@
#include "ClusterCopier.h"
#include <chrono>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/Logger.h>
#include <Poco/ConsoleChannel.h>
@ -13,14 +12,11 @@
#include <Poco/FileChannel.h>
#include <Poco/SplitterChannel.h>
#include <Poco/Util/HelpFormatter.h>
#include <boost/algorithm/string.hpp>
#include <pcg_random.hpp>
#include <common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <daemon/OwnPatternFormatter.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
@ -61,6 +57,7 @@
#include <DataStreams/NullBlockOutputStream.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
@ -897,6 +894,28 @@ public:
}
}
void uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force)
{
auto local_task_description_path = task_path + "/description";
String task_config_str;
{
ReadBufferFromFile in(task_file);
readStringUntilEOF(task_config_str, in);
}
if (task_config_str.empty())
return;
auto zookeeper = context.getZooKeeper();
zookeeper->createAncestors(local_task_description_path);
auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
if (code && force)
zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
LOG_DEBUG(log, "Task description " << ((code && !force) ? "not " : "") << "uploaded to " << local_task_description_path << " with result " << code << " ("<< zookeeper->error2string(code) << ")");
}
void reloadTaskDescription()
{
auto zookeeper = context.getZooKeeper();
@ -2107,6 +2126,10 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper")
.argument("task-path").binding("task-path"));
options.addOption(Poco::Util::Option("task-file", "", "path to task file for uploading in ZooKeeper to task-path")
.argument("task-file").binding("task-file"));
options.addOption(Poco::Util::Option("task-upload-force", "", "Force upload task-file even node already exists")
.argument("task-upload-force").binding("task-upload-force"));
options.addOption(Poco::Util::Option("safe-mode", "", "disables ALTER DROP PARTITION in case of errors")
.binding("safe-mode"));
options.addOption(Poco::Util::Option("copy-fault-probability", "", "the copying fails with specified probability (used to test partition state recovering)")
@ -2157,6 +2180,11 @@ void ClusterCopierApp::mainImpl()
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
auto task_file = config().getString("task-file", "");
if (!task_file.empty())
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
copier->init();
copier->process();
}

View File

@ -0,0 +1,85 @@
#include <AggregateFunctions/AggregateFunctionLeastSqr.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionLeastSqr(
const String & name,
const DataTypes & arguments,
const Array & params
)
{
assertNoParameters(name, params);
assertBinary(name, arguments);
const IDataType * x_arg = arguments.front().get();
WhichDataType which_x {
x_arg
};
const IDataType * y_arg = arguments.back().get();
WhichDataType which_y {
y_arg
};
#define FOR_LEASTSQR_TYPES_2(M, T) \
M(T, UInt8) \
M(T, UInt16) \
M(T, UInt32) \
M(T, UInt64) \
M(T, Int8) \
M(T, Int16) \
M(T, Int32) \
M(T, Int64) \
M(T, Float32) \
M(T, Float64)
#define FOR_LEASTSQR_TYPES(M) \
FOR_LEASTSQR_TYPES_2(M, UInt8) \
FOR_LEASTSQR_TYPES_2(M, UInt16) \
FOR_LEASTSQR_TYPES_2(M, UInt32) \
FOR_LEASTSQR_TYPES_2(M, UInt64) \
FOR_LEASTSQR_TYPES_2(M, Int8) \
FOR_LEASTSQR_TYPES_2(M, Int16) \
FOR_LEASTSQR_TYPES_2(M, Int32) \
FOR_LEASTSQR_TYPES_2(M, Int64) \
FOR_LEASTSQR_TYPES_2(M, Float32) \
FOR_LEASTSQR_TYPES_2(M, Float64)
#define DISPATCH(T1, T2) \
if (which_x.idx == TypeIndex::T1 && which_y.idx == TypeIndex::T2) \
return std::make_shared<AggregateFunctionLeastSqr<T1, T2>>( \
arguments, \
params \
);
FOR_LEASTSQR_TYPES(DISPATCH)
#undef FOR_LEASTSQR_TYPES_2
#undef FOR_LEASTSQR_TYPES
#undef DISPATCH
throw Exception(
"Illegal types ("
+ x_arg->getName() + ", " + y_arg->getName()
+ ") of arguments of aggregate function " + name
+ ", must be Native Ints, Native UInts or Floats",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
);
}
}
void registerAggregateFunctionLeastSqr(AggregateFunctionFactory & factory)
{
factory.registerFunction("leastSqr", createAggregateFunctionLeastSqr);
}
}

View File

@ -0,0 +1,195 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <limits>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
template <typename X, typename Y, typename Ret>
struct AggregateFunctionLeastSqrData final
{
size_t count = 0;
Ret sum_x = 0;
Ret sum_y = 0;
Ret sum_xx = 0;
Ret sum_xy = 0;
void add(X x, Y y)
{
count += 1;
sum_x += x;
sum_y += y;
sum_xx += x * x;
sum_xy += x * y;
}
void merge(const AggregateFunctionLeastSqrData & other)
{
count += other.count;
sum_x += other.sum_x;
sum_y += other.sum_y;
sum_xx += other.sum_xx;
sum_xy += other.sum_xy;
}
void serialize(WriteBuffer & buf) const
{
writeBinary(count, buf);
writeBinary(sum_x, buf);
writeBinary(sum_y, buf);
writeBinary(sum_xx, buf);
writeBinary(sum_xy, buf);
}
void deserialize(ReadBuffer & buf)
{
readBinary(count, buf);
readBinary(sum_x, buf);
readBinary(sum_y, buf);
readBinary(sum_xx, buf);
readBinary(sum_xy, buf);
}
Ret getK() const
{
Ret divisor = sum_xx * count - sum_x * sum_x;
if (divisor == 0)
return std::numeric_limits<Ret>::quiet_NaN();
return (sum_xy * count - sum_x * sum_y) / divisor;
}
Ret getB(Ret k) const
{
if (count == 0)
return std::numeric_limits<Ret>::quiet_NaN();
return (sum_y - k * sum_x) / count;
}
};
/// Calculates simple linear regression parameters.
/// Result is a tuple (k, b) for y = k * x + b equation, solved by least squares approximation.
template <typename X, typename Y, typename Ret = Float64>
class AggregateFunctionLeastSqr final : public IAggregateFunctionDataHelper<
AggregateFunctionLeastSqrData<X, Y, Ret>,
AggregateFunctionLeastSqr<X, Y, Ret>
>
{
public:
AggregateFunctionLeastSqr(
const DataTypes & arguments,
const Array & params
):
IAggregateFunctionDataHelper<
AggregateFunctionLeastSqrData<X, Y, Ret>,
AggregateFunctionLeastSqr<X, Y, Ret>
> {arguments, params}
{
// notice: arguments has been checked before
}
String getName() const override
{
return "leastSqr";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
void add(
AggregateDataPtr place,
const IColumn ** columns,
size_t row_num,
Arena *
) const override
{
auto col_x {
static_cast<const ColumnVector<X> *>(columns[0])
};
auto col_y {
static_cast<const ColumnVector<Y> *>(columns[1])
};
X x = col_x->getData()[row_num];
Y y = col_y->getData()[row_num];
this->data(place).add(x, y);
}
void merge(
AggregateDataPtr place,
ConstAggregateDataPtr rhs, Arena *
) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(
ConstAggregateDataPtr place,
WriteBuffer & buf
) const override
{
this->data(place).serialize(buf);
}
void deserialize(
AggregateDataPtr place,
ReadBuffer & buf, Arena *
) const override
{
this->data(place).deserialize(buf);
}
DataTypePtr getReturnType() const override
{
DataTypes types {
std::make_shared<DataTypeNumber<Ret>>(),
std::make_shared<DataTypeNumber<Ret>>(),
};
Strings names {
"k",
"b",
};
return std::make_shared<DataTypeTuple>(
std::move(types),
std::move(names)
);
}
void insertResultInto(
ConstAggregateDataPtr place,
IColumn & to
) const override
{
Ret k = this->data(place).getK();
Ret b = this->data(place).getB(k);
auto & col_tuple = static_cast<ColumnTuple &>(to);
auto & col_k = static_cast<ColumnVector<Ret> &>(col_tuple.getColumn(0));
auto & col_b = static_cast<ColumnVector<Ret> &>(col_tuple.getColumn(1));
col_k.getData().push_back(k);
col_b.getData().push_back(b);
}
};
}

View File

@ -16,7 +16,6 @@
#include <Common/HashTable/HashSet.h>
#include <Common/HyperLogLogWithSmallSetOptimization.h>
#include <Common/CombinedCardinalityEstimator.h>
#include <Common/MemoryTracker.h>
#include <Common/typeid_cast.h>
#include <AggregateFunctions/UniquesHashSet.h>

View File

@ -29,6 +29,7 @@ void registerAggregateFunctionsBitwise(AggregateFunctionFactory &);
void registerAggregateFunctionsBitmap(AggregateFunctionFactory &);
void registerAggregateFunctionsMaxIntersections(AggregateFunctionFactory &);
void registerAggregateFunctionEntropy(AggregateFunctionFactory &);
void registerAggregateFunctionLeastSqr(AggregateFunctionFactory &);
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorArray(AggregateFunctionCombinatorFactory &);
@ -69,6 +70,7 @@ void registerAggregateFunctions()
registerAggregateFunctionHistogram(factory);
registerAggregateFunctionRetention(factory);
registerAggregateFunctionEntropy(factory);
registerAggregateFunctionLeastSqr(factory);
}
{

View File

@ -10,7 +10,7 @@ namespace DB
/** Aligned piece of memory.
* It can only be allocated and destroyed.
* MemoryTracker is not used. It is intended for small pieces of memory.
* MemoryTracker is not used. AlignedBuffer is intended for small pieces of memory.
*/
class AlignedBuffer : private boost::noncopyable
{

View File

@ -1,190 +0,0 @@
#include <Common/Allocator.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <malloc.h>
#endif
#include <cstdlib>
#include <algorithm>
#include <sys/mman.h>
#include <Core/Defines.h>
#ifdef THREAD_SANITIZER
/// Thread sanitizer does not intercept mremap. The usage of mremap will lead to false positives.
#define DISABLE_MREMAP 1
#endif
#include <common/mremap.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <IO/WriteHelpers.h>
/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
}
}
/** Many modern allocators (for example, tcmalloc) do not do a mremap for realloc,
* even in case of large enough chunks of memory.
* Although this allows you to increase performance and reduce memory consumption during realloc.
* To fix this, we do mremap manually if the chunk of memory is large enough.
* The threshold (64 MB) is chosen quite large, since changing the address space is
* very slow, especially in the case of a large number of threads.
* We expect that the set of operations mmap/something to do/mremap can only be performed about 1000 times per second.
*
* PS. This is also required, because tcmalloc can not allocate a chunk of memory greater than 16 GB.
*/
#ifdef NDEBUG
static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
#else
/// In debug build, use small mmap threshold to reproduce more memory stomping bugs.
/// Along with ASLR it will hopefully detect more issues than ASan.
/// The program may fail due to the limit on number of memory mappings.
static constexpr size_t MMAP_THRESHOLD = 4096;
#endif
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;
static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
template <bool clear_memory_>
void * Allocator<clear_memory_>::mmap_hint()
{
#if ALLOCATOR_ASLR
return reinterpret_cast<void *>(std::uniform_int_distribution<intptr_t>(0x100000000000UL, 0x700000000000UL)(rng));
#else
return nullptr;
#endif
}
template <bool clear_memory_>
void * Allocator<clear_memory_>::alloc(size_t size, size_t alignment)
{
CurrentMemoryTracker::alloc(size);
void * buf;
if (size >= MMAP_THRESHOLD)
{
if (alignment > MMAP_MIN_ALIGNMENT)
throw DB::Exception("Too large alignment " + formatReadableSizeWithBinarySuffix(alignment) + ": more than page size when allocating "
+ formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::BAD_ARGUMENTS);
buf = mmap(mmap_hint(), size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
/// No need for zero-fill, because mmap guarantees it.
}
else
{
if (alignment <= MALLOC_MIN_ALIGNMENT)
{
if (clear_memory)
buf = ::calloc(size, 1);
else
buf = ::malloc(size);
if (nullptr == buf)
DB::throwFromErrno("Allocator: Cannot malloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
else
{
buf = nullptr;
int res = posix_memalign(&buf, alignment, size);
if (0 != res)
DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
if (clear_memory)
memset(buf, 0, size);
}
}
return buf;
}
template <bool clear_memory_>
void Allocator<clear_memory_>::free(void * buf, size_t size)
{
if (size >= MMAP_THRESHOLD)
{
if (0 != munmap(buf, size))
DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
}
else
{
::free(buf);
}
CurrentMemoryTracker::free(size);
}
template <bool clear_memory_>
void * Allocator<clear_memory_>::realloc(void * buf, size_t old_size, size_t new_size, size_t alignment)
{
if (old_size == new_size)
{
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
}
else if (old_size < MMAP_THRESHOLD && new_size < MMAP_THRESHOLD && alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
if (clear_memory && new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
}
else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}
else
{
/// All other cases that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods.
void * new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
free(buf, old_size);
buf = new_buf;
}
return buf;
}
/// Explicit template instantiations.
template class Allocator<true>;
template class Allocator<false>;

View File

@ -10,11 +10,88 @@
#define ALLOCATOR_ASLR 1
#endif
#if ALLOCATOR_ASLR
#include <pcg_random.hpp>
#include <Common/randomSeed.h>
#include <pcg_random.hpp>
#include <Common/randomSeed.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <malloc.h>
#endif
#include <cstdlib>
#include <algorithm>
#include <sys/mman.h>
#include <Core/Defines.h>
#ifdef THREAD_SANITIZER
/// Thread sanitizer does not intercept mremap. The usage of mremap will lead to false positives.
#define DISABLE_MREMAP 1
#endif
#include <common/mremap.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
/** Many modern allocators (for example, tcmalloc) do not do a mremap for realloc,
* even in case of large enough chunks of memory.
* Although this allows you to increase performance and reduce memory consumption during realloc.
* To fix this, we do mremap manually if the chunk of memory is large enough.
* The threshold (64 MB) is chosen quite large, since changing the address space is
* very slow, especially in the case of a large number of threads.
* We expect that the set of operations mmap/something to do/mremap can only be performed about 1000 times per second.
*
* PS. This is also required, because tcmalloc can not allocate a chunk of memory greater than 16 GB.
*/
#ifdef NDEBUG
static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
#else
/// In debug build, use small mmap threshold to reproduce more memory stomping bugs.
/// Along with ASLR it will hopefully detect more issues than ASan.
/// The program may fail due to the limit on number of memory mappings.
static constexpr size_t MMAP_THRESHOLD = 4096;
#endif
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;
static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
}
}
namespace AllocatorHints
{
struct DefaultHint
{
void * mmap_hint()
{
return nullptr;
}
};
struct RandomHint
{
void * mmap_hint()
{
return reinterpret_cast<void *>(std::uniform_int_distribution<intptr_t>(0x100000000000UL, 0x700000000000UL)(rng));
}
private:
pcg64 rng{randomSeed()};
};
}
/** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena.
* Also used in hash tables.
@ -23,31 +100,126 @@
* - passing the size into the `free` method;
* - by the presence of the `alignment` argument;
* - the possibility of zeroing memory (used in hash tables);
* - hint class for mmap
* - mmap_threshold for using mmap less or more
*/
template <bool clear_memory_>
class Allocator
template <bool clear_memory_, typename Hint, size_t mmap_threshold>
class AllocatorWithHint : Hint
{
#if ALLOCATOR_ASLR
private:
pcg64 rng{randomSeed()};
#endif
void * mmap_hint();
protected:
static constexpr bool clear_memory = clear_memory_;
public:
/// Allocate memory range.
void * alloc(size_t size, size_t alignment = 0);
void * alloc(size_t size, size_t alignment = 0)
{
CurrentMemoryTracker::alloc(size);
void * buf;
if (size >= mmap_threshold)
{
if (alignment > MMAP_MIN_ALIGNMENT)
throw DB::Exception("Too large alignment " + formatReadableSizeWithBinarySuffix(alignment) + ": more than page size when allocating "
+ formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::BAD_ARGUMENTS);
buf = mmap(Hint::mmap_hint(), size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
/// No need for zero-fill, because mmap guarantees it.
}
else
{
if (alignment <= MALLOC_MIN_ALIGNMENT)
{
if constexpr (clear_memory)
buf = ::calloc(size, 1);
else
buf = ::malloc(size);
if (nullptr == buf)
DB::throwFromErrno("Allocator: Cannot malloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
else
{
buf = nullptr;
int res = posix_memalign(&buf, alignment, size);
if (0 != res)
DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
if (clear_memory)
memset(buf, 0, size);
}
}
return buf;
}
/// Free memory range.
void free(void * buf, size_t size);
void free(void * buf, size_t size)
{
if (size >= mmap_threshold)
{
if (0 != munmap(buf, size))
DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
}
else
{
::free(buf);
}
CurrentMemoryTracker::free(size);
}
/** Enlarge memory range.
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
*/
void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0);
void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0)
{
if (old_size == new_size)
{
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
}
else if (old_size < mmap_threshold && new_size < mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
if (clear_memory && new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
}
else if (old_size >= mmap_threshold && new_size >= mmap_threshold)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}
else
{
/// All other cases that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods.
void * new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
free(buf, old_size);
buf = new_buf;
}
return buf;
}
protected:
static constexpr size_t getStackThreshold()
@ -56,6 +228,13 @@ protected:
}
};
#if ALLOCATOR_ASLR
template <bool clear_memory>
using Allocator = AllocatorWithHint<clear_memory, AllocatorHints::RandomHint, MMAP_THRESHOLD>;
#else
template <bool clear_memory>
using Allocator = AllocatorWithHint<clear_memory, AllocatorHints::DefaultHint, MMAP_THRESHOLD>;
#endif
/** When using AllocatorWithStackMemory, located on the stack,
* GCC 4.9 mistakenly assumes that we can call `free` from a pointer to the stack.

View File

@ -49,7 +49,7 @@ private:
ProfileEvents::increment(ProfileEvents::ArenaAllocChunks);
ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_);
begin = reinterpret_cast<char *>(Allocator::alloc(size_));
begin = reinterpret_cast<char *>(Allocator<false>::alloc(size_));
pos = begin;
end = begin + size_ - pad_right;
prev = prev_;
@ -57,7 +57,7 @@ private:
~Chunk()
{
Allocator::free(begin, size());
Allocator<false>::free(begin, size());
if (prev)
delete prev;

View File

@ -55,7 +55,7 @@ public:
char * alloc(const size_t size)
{
if (size > max_fixed_block_size)
return static_cast<char *>(Allocator::alloc(size));
return static_cast<char *>(Allocator<false>::alloc(size));
/// find list of required size
const auto list_idx = findFreeListIndex(size);
@ -76,7 +76,7 @@ public:
void free(char * ptr, const size_t size)
{
if (size > max_fixed_block_size)
return Allocator::free(ptr, size);
return Allocator<false>::free(ptr, size);
/// find list of required size
const auto list_idx = findFreeListIndex(size);

View File

@ -1,52 +0,0 @@
#pragma once
#include <vector>
//#include <Common/PODArray.h>
namespace DB
{
/**
* This class is intended to push sortable data into.
* When looking up values the container ensures that it is sorted for log(N) lookup
*
* Note, this is only efficient when the insertions happen in one stage, followed by all retrievals
* This way the data only gets sorted once.
*/
template <typename T>
class SortedLookupPODArray
{
public:
using Base = std::vector<T>;
//using Base = PaddedPODArray<T>;
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)
{
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
sorted = false;
}
typename Base::const_iterator upper_bound(const T & k)
{
if (!sorted)
sort();
return std::upper_bound(array.cbegin(), array.cend(), k);
}
typename Base::const_iterator cbegin() const { return array.cbegin(); }
typename Base::const_iterator cend() const { return array.cend(); }
private:
Base array;
bool sorted = false;
void sort()
{
std::sort(array.begin(), array.end());
sorted = true;
}
};
}

View File

@ -35,7 +35,7 @@ bool CachedCompressedReadBuffer::nextImpl()
UInt128 key = cache->hash(path, file_pos);
owned_cell = cache->get(key);
if (!owned_cell || !codec)
if (!owned_cell)
{
/// If not, read it from the file.
initInput();
@ -49,21 +49,22 @@ bool CachedCompressedReadBuffer::nextImpl()
if (owned_cell->compressed_size)
{
owned_cell->data.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
owned_cell->additional_bytes = codec->getAdditionalSizeAtTheEndOfBuffer();
owned_cell->data.resize(size_decompressed + owned_cell->additional_bytes);
decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
/// Put data into cache.
cache->set(key, owned_cell);
}
/// Put data into cache.
/// NOTE: Even if we don't read anything (compressed_size == 0)
/// because we can reuse this information and don't reopen file in future
cache->set(key, owned_cell);
}
if (owned_cell->data.size() == 0)
{
owned_cell = nullptr;
return false;
}
working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - codec->getAdditionalSizeAtTheEndOfBuffer());
working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes);
file_pos += owned_cell->compressed_size;

View File

@ -23,20 +23,21 @@ namespace DB
class TaskNotification final : public Poco::Notification
{
public:
explicit TaskNotification(const BackgroundSchedulePool::TaskInfoPtr & task) : task(task) {}
explicit TaskNotification(const BackgroundSchedulePoolTaskInfoPtr & task) : task(task) {}
void execute() { task->execute(); }
private:
BackgroundSchedulePool::TaskInfoPtr task;
BackgroundSchedulePoolTaskInfoPtr task;
};
BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_)
: pool(pool_) , log_name(log_name_) , function(function_)
BackgroundSchedulePoolTaskInfo::BackgroundSchedulePoolTaskInfo(
BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_)
: pool(pool_), log_name(log_name_), function(function_)
{
}
bool BackgroundSchedulePool::TaskInfo::schedule()
bool BackgroundSchedulePoolTaskInfo::schedule()
{
std::lock_guard lock(schedule_mutex);
@ -47,7 +48,7 @@ bool BackgroundSchedulePool::TaskInfo::schedule()
return true;
}
bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t ms)
{
std::lock_guard lock(schedule_mutex);
@ -58,7 +59,7 @@ bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
return true;
}
void BackgroundSchedulePool::TaskInfo::deactivate()
void BackgroundSchedulePoolTaskInfo::deactivate()
{
std::lock_guard lock_exec(exec_mutex);
std::lock_guard lock_schedule(schedule_mutex);
@ -73,13 +74,13 @@ void BackgroundSchedulePool::TaskInfo::deactivate()
pool.cancelDelayedTask(shared_from_this(), lock_schedule);
}
void BackgroundSchedulePool::TaskInfo::activate()
void BackgroundSchedulePoolTaskInfo::activate()
{
std::lock_guard lock(schedule_mutex);
deactivated = false;
}
bool BackgroundSchedulePool::TaskInfo::activateAndSchedule()
bool BackgroundSchedulePoolTaskInfo::activateAndSchedule()
{
std::lock_guard lock(schedule_mutex);
@ -91,7 +92,7 @@ bool BackgroundSchedulePool::TaskInfo::activateAndSchedule()
return true;
}
void BackgroundSchedulePool::TaskInfo::execute()
void BackgroundSchedulePoolTaskInfo::execute()
{
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};
@ -131,7 +132,7 @@ void BackgroundSchedulePool::TaskInfo::execute()
}
}
void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock)
void BackgroundSchedulePoolTaskInfo::scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock)
{
scheduled = true;
@ -145,7 +146,7 @@ void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard<std::mutex>
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}
Coordination::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
{
return [t = shared_from_this()](const Coordination::WatchResponse &)
{

View File

@ -20,6 +20,8 @@ namespace DB
{
class TaskNotification;
class BackgroundSchedulePoolTaskInfo;
class BackgroundSchedulePoolTaskHolder;
/** Executes functions scheduled at a specific point in time.
@ -35,84 +37,14 @@ class TaskNotification;
class BackgroundSchedulePool
{
public:
class TaskInfo;
friend class BackgroundSchedulePoolTaskInfo;
using TaskInfo = BackgroundSchedulePoolTaskInfo;
using TaskInfoPtr = std::shared_ptr<TaskInfo>;
using TaskFunc = std::function<void()>;
using TaskHolder = BackgroundSchedulePoolTaskHolder;
using DelayedTasks = std::multimap<Poco::Timestamp, TaskInfoPtr>;
class TaskInfo : public std::enable_shared_from_this<TaskInfo>, private boost::noncopyable
{
public:
TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_);
/// Schedule for execution as soon as possible (if not already scheduled).
/// If the task was already scheduled with delay, the delay will be ignored.
bool schedule();
/// Schedule for execution after specified delay.
bool scheduleAfter(size_t ms);
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();
void activate();
/// Atomically activate task and schedule it for execution.
bool activateAndSchedule();
/// get Coordination::WatchCallback needed for notifications from ZooKeeper watches.
Coordination::WatchCallback getWatchCallback();
private:
friend class TaskNotification;
friend class BackgroundSchedulePool;
void execute();
void scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock);
BackgroundSchedulePool & pool;
std::string log_name;
TaskFunc function;
std::mutex exec_mutex;
std::mutex schedule_mutex;
/// Invariants:
/// * If deactivated is true then scheduled, delayed and executing are all false.
/// * scheduled and delayed cannot be true at the same time.
bool deactivated = false;
bool scheduled = false;
bool delayed = false;
bool executing = false;
/// If the task is scheduled with delay, points to element of delayed_tasks.
DelayedTasks::iterator iterator;
};
class TaskHolder
{
public:
TaskHolder() = default;
explicit TaskHolder(const TaskInfoPtr & task_info_) : task_info(task_info_) {}
TaskHolder(const TaskHolder & other) = delete;
TaskHolder(TaskHolder && other) noexcept = default;
TaskHolder & operator=(const TaskHolder & other) noexcept = delete;
TaskHolder & operator=(TaskHolder && other) noexcept = default;
~TaskHolder()
{
if (task_info)
task_info->deactivate();
}
TaskInfo * operator->() { return task_info.get(); }
const TaskInfo * operator->() const { return task_info.get(); }
private:
TaskInfoPtr task_info;
};
TaskHolder createTask(const std::string & log_name, const TaskFunc & function);
size_t getNumberOfThreads() const { return size; }
@ -153,4 +85,81 @@ private:
void attachToThreadGroup();
};
class BackgroundSchedulePoolTaskInfo : public std::enable_shared_from_this<BackgroundSchedulePoolTaskInfo>, private boost::noncopyable
{
public:
BackgroundSchedulePoolTaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_);
/// Schedule for execution as soon as possible (if not already scheduled).
/// If the task was already scheduled with delay, the delay will be ignored.
bool schedule();
/// Schedule for execution after specified delay.
bool scheduleAfter(size_t ms);
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();
void activate();
/// Atomically activate task and schedule it for execution.
bool activateAndSchedule();
/// get Coordination::WatchCallback needed for notifications from ZooKeeper watches.
Coordination::WatchCallback getWatchCallback();
private:
friend class TaskNotification;
friend class BackgroundSchedulePool;
void execute();
void scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock);
BackgroundSchedulePool & pool;
std::string log_name;
BackgroundSchedulePool::TaskFunc function;
std::mutex exec_mutex;
std::mutex schedule_mutex;
/// Invariants:
/// * If deactivated is true then scheduled, delayed and executing are all false.
/// * scheduled and delayed cannot be true at the same time.
bool deactivated = false;
bool scheduled = false;
bool delayed = false;
bool executing = false;
/// If the task is scheduled with delay, points to element of delayed_tasks.
BackgroundSchedulePool::DelayedTasks::iterator iterator;
};
using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr<BackgroundSchedulePoolTaskInfo>;
class BackgroundSchedulePoolTaskHolder
{
public:
BackgroundSchedulePoolTaskHolder() = default;
explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {}
BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete;
BackgroundSchedulePoolTaskHolder(BackgroundSchedulePoolTaskHolder && other) noexcept = default;
BackgroundSchedulePoolTaskHolder & operator=(const BackgroundSchedulePoolTaskHolder & other) noexcept = delete;
BackgroundSchedulePoolTaskHolder & operator=(BackgroundSchedulePoolTaskHolder && other) noexcept = default;
~BackgroundSchedulePoolTaskHolder()
{
if (task_info)
task_info->deactivate();
}
BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); }
const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); }
private:
BackgroundSchedulePoolTaskInfoPtr task_info;
};
}

View File

@ -5,8 +5,6 @@
#include <DataStreams/IBlockInputStream.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
#include <Common/MemoryTracker.h>
#include <Poco/Ext/ThreadNumber.h>
namespace CurrentMetrics

View File

@ -43,6 +43,9 @@ struct BlockIO
BlockIO & operator= (const BlockIO & rhs)
{
if (this == &rhs)
return *this;
out.reset();
in.reset();
process_list_entry.reset();

View File

@ -1,7 +1,6 @@
#include <future>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <Common/CurrentThread.h>

View File

@ -8,8 +8,6 @@
#include <condition_variable>
class MemoryTracker;
namespace DB
{

View File

@ -11,7 +11,6 @@
#include <DataStreams/IBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
namespace
{
class DataTypeDomanIPv4 : public DataTypeDomainWithSimpleSerialization
class DataTypeDomainIPv4 : public DataTypeDomainWithSimpleSerialization
{
public:
const char * getName() const override
@ -63,7 +63,7 @@ public:
}
};
class DataTypeDomanIPv6 : public DataTypeDomainWithSimpleSerialization
class DataTypeDomainIPv6 : public DataTypeDomainWithSimpleSerialization
{
public:
const char * getName() const override
@ -111,8 +111,8 @@ public:
void registerDataTypeDomainIPv4AndIPv6(DataTypeFactory & factory)
{
factory.registerDataTypeDomain("UInt32", std::make_unique<DataTypeDomanIPv4>());
factory.registerDataTypeDomain("FixedString(16)", std::make_unique<DataTypeDomanIPv6>());
factory.registerDataTypeDomain("UInt32", std::make_unique<DataTypeDomainIPv4>());
factory.registerDataTypeDomain("FixedString(16)", std::make_unique<DataTypeDomainIPv6>());
}
} // namespace DB

View File

@ -690,10 +690,9 @@ void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams(
};
if (!settings.continuous_reading)
{
low_cardinality_state->num_pending_rows = 0;
if (!settings.continuous_reading)
{
/// Remember in state that some granules were skipped and we need to update dictionary.
low_cardinality_state->need_update_dictionary = true;
}

View File

@ -0,0 +1,306 @@
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringOrArrayToT.h>
#include <cstring>
#ifdef __SSE4_1__
# include <emmintrin.h>
# include <smmintrin.h>
# include <tmmintrin.h>
#endif
namespace DB
{
struct ValidUTF8Impl
{
/*
* inspired by https://github.com/cyb70289/utf8/
* http://www.unicode.org/versions/Unicode6.0.0/ch03.pdf - page 94
*
* Table 3-7. Well-Formed UTF-8 Byte Sequences
*
* +--------------------+------------+-------------+------------+-------------+
* | Code Points | First Byte | Second Byte | Third Byte | Fourth Byte |
* +--------------------+------------+-------------+------------+-------------+
* | U+0000..U+007F | 00..7F | | | |
* +--------------------+------------+-------------+------------+-------------+
* | U+0080..U+07FF | C2..DF | 80..BF | | |
* +--------------------+------------+-------------+------------+-------------+
* | U+0800..U+0FFF | E0 | A0..BF | 80..BF | |
* +--------------------+------------+-------------+------------+-------------+
* | U+1000..U+CFFF | E1..EC | 80..BF | 80..BF | |
* +--------------------+------------+-------------+------------+-------------+
* | U+D000..U+D7FF | ED | 80..9F | 80..BF | |
* +--------------------+------------+-------------+------------+-------------+
* | U+E000..U+FFFF | EE..EF | 80..BF | 80..BF | |
* +--------------------+------------+-------------+------------+-------------+
* | U+10000..U+3FFFF | F0 | 90..BF | 80..BF | 80..BF |
* +--------------------+------------+-------------+------------+-------------+
* | U+40000..U+FFFFF | F1..F3 | 80..BF | 80..BF | 80..BF |
* +--------------------+------------+-------------+------------+-------------+
* | U+100000..U+10FFFF | F4 | 80..8F | 80..BF | 80..BF |
* +--------------------+------------+-------------+------------+-------------+
*/
static inline UInt8 isValidUTF8Naive(const UInt8 * data, UInt64 len)
{
while (len)
{
int bytes;
const UInt8 byte1 = data[0];
/* 00..7F */
if (byte1 <= 0x7F)
{
bytes = 1;
}
/* C2..DF, 80..BF */
else if (len >= 2 && byte1 >= 0xC2 && byte1 <= 0xDF && static_cast<Int8>(data[1]) <= static_cast<Int8>(0xBF))
{
bytes = 2;
}
else if (len >= 3)
{
const UInt8 byte2 = data[1];
bool byte2_ok = static_cast<Int8>(byte2) <= static_cast<Int8>(0xBF);
bool byte3_ok = static_cast<Int8>(data[2]) <= static_cast<Int8>(0xBF);
if (byte2_ok && byte3_ok &&
/* E0, A0..BF, 80..BF */
((byte1 == 0xE0 && byte2 >= 0xA0) ||
/* E1..EC, 80..BF, 80..BF */
(byte1 >= 0xE1 && byte1 <= 0xEC) ||
/* ED, 80..9F, 80..BF */
(byte1 == 0xED && byte2 <= 0x9F) ||
/* EE..EF, 80..BF, 80..BF */
(byte1 >= 0xEE && byte1 <= 0xEF)))
{
bytes = 3;
}
else if (len >= 4)
{
bool byte4_ok = static_cast<Int8>(data[3]) <= static_cast<Int8>(0xBF);
if (byte2_ok && byte3_ok && byte4_ok &&
/* F0, 90..BF, 80..BF, 80..BF */
((byte1 == 0xF0 && byte2 >= 0x90) ||
/* F1..F3, 80..BF, 80..BF, 80..BF */
(byte1 >= 0xF1 && byte1 <= 0xF3) ||
/* F4, 80..8F, 80..BF, 80..BF */
(byte1 == 0xF4 && byte2 <= 0x8F)))
{
bytes = 4;
}
else
{
return false;
}
}
else
{
return false;
}
}
else
{
return false;
}
len -= bytes;
data += bytes;
}
return true;
}
#ifndef __SSE4_1__
static inline UInt8 isValidUTF8(const UInt8 * data, UInt64 len) { return isValidUTF8Naive(data, len); }
#else
static inline UInt8 isValidUTF8(const UInt8 * data, UInt64 len)
{
/*
* Map high nibble of "First Byte" to legal character length minus 1
* 0x00 ~ 0xBF --> 0
* 0xC0 ~ 0xDF --> 1
* 0xE0 ~ 0xEF --> 2
* 0xF0 ~ 0xFF --> 3
*/
const __m128i first_len_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 3);
/* Map "First Byte" to 8-th item of range table (0xC2 ~ 0xF4) */
const __m128i first_range_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8);
/*
* Range table, map range index to min and max values
*/
const __m128i range_min_tbl
= _mm_setr_epi8(0x00, 0x80, 0x80, 0x80, 0xA0, 0x80, 0x90, 0x80, 0xC2, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F);
const __m128i range_max_tbl
= _mm_setr_epi8(0x7F, 0xBF, 0xBF, 0xBF, 0xBF, 0x9F, 0xBF, 0x8F, 0xF4, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80);
/*
* Tables for fast handling of four special First Bytes(E0,ED,F0,F4), after
* which the Second Byte are not 80~BF. It contains "range index adjustment".
* +------------+---------------+------------------+----------------+
* | First Byte | original range| range adjustment | adjusted range |
* +------------+---------------+------------------+----------------+
* | E0 | 2 | 2 | 4 |
* +------------+---------------+------------------+----------------+
* | ED | 2 | 3 | 5 |
* +------------+---------------+------------------+----------------+
* | F0 | 3 | 3 | 6 |
* +------------+---------------+------------------+----------------+
* | F4 | 4 | 4 | 8 |
* +------------+---------------+------------------+----------------+
*/
/* index1 -> E0, index14 -> ED */
const __m128i df_ee_tbl = _mm_setr_epi8(0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0);
/* index1 -> F0, index5 -> F4 */
const __m128i ef_fe_tbl = _mm_setr_epi8(0, 3, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
__m128i prev_input = _mm_set1_epi8(0);
__m128i prev_first_len = _mm_set1_epi8(0);
__m128i error = _mm_set1_epi8(0);
auto check_packed = [&](__m128i input) noexcept
{
/* high_nibbles = input >> 4 */
const __m128i high_nibbles = _mm_and_si128(_mm_srli_epi16(input, 4), _mm_set1_epi8(0x0F));
/* first_len = legal character length minus 1 */
/* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
/* first_len = first_len_tbl[high_nibbles] */
__m128i first_len = _mm_shuffle_epi8(first_len_tbl, high_nibbles);
/* First Byte: set range index to 8 for bytes within 0xC0 ~ 0xFF */
/* range = first_range_tbl[high_nibbles] */
__m128i range = _mm_shuffle_epi8(first_range_tbl, high_nibbles);
/* Second Byte: set range index to first_len */
/* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
/* range |= (first_len, prev_first_len) << 1 byte */
range = _mm_or_si128(range, _mm_alignr_epi8(first_len, prev_first_len, 15));
/* Third Byte: set range index to saturate_sub(first_len, 1) */
/* 0 for 00~7F, 0 for C0~DF, 1 for E0~EF, 2 for F0~FF */
__m128i tmp1;
__m128i tmp2;
/* tmp1 = saturate_sub(first_len, 1) */
tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(1));
/* tmp2 = saturate_sub(prev_first_len, 1) */
tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(1));
/* range |= (tmp1, tmp2) << 2 bytes */
range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 14));
/* Fourth Byte: set range index to saturate_sub(first_len, 2) */
/* 0 for 00~7F, 0 for C0~DF, 0 for E0~EF, 1 for F0~FF */
/* tmp1 = saturate_sub(first_len, 2) */
tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(2));
/* tmp2 = saturate_sub(prev_first_len, 2) */
tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(2));
/* range |= (tmp1, tmp2) << 3 bytes */
range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 13));
/*
* Now we have below range indices caluclated
* Correct cases:
* - 8 for C0~FF
* - 3 for 1st byte after F0~FF
* - 2 for 1st byte after E0~EF or 2nd byte after F0~FF
* - 1 for 1st byte after C0~DF or 2nd byte after E0~EF or
* 3rd byte after F0~FF
* - 0 for others
* Error cases:
* 9,10,11 if non ascii First Byte overlaps
* E.g., F1 80 C2 90 --> 8 3 10 2, where 10 indicates error
*/
/* Adjust Second Byte range for special First Bytes(E0,ED,F0,F4) */
/* Overlaps lead to index 9~15, which are illegal in range table */
__m128i shift1, pos, range2;
/* shift1 = (input, prev_input) << 1 byte */
shift1 = _mm_alignr_epi8(input, prev_input, 15);
pos = _mm_sub_epi8(shift1, _mm_set1_epi8(0xEF));
/*
* shift1: | EF F0 ... FE | FF 00 ... ... DE | DF E0 ... EE |
* pos: | 0 1 15 | 16 17 239| 240 241 255|
* pos-240: | 0 0 0 | 0 0 0 | 0 1 15 |
* pos+112: | 112 113 127| >= 128 | >= 128 |
*/
tmp1 = _mm_subs_epu8(pos, _mm_set1_epi8(240));
range2 = _mm_shuffle_epi8(df_ee_tbl, tmp1);
tmp2 = _mm_adds_epu8(pos, _mm_set1_epi8(112));
range2 = _mm_add_epi8(range2, _mm_shuffle_epi8(ef_fe_tbl, tmp2));
range = _mm_add_epi8(range, range2);
/* Load min and max values per calculated range index */
__m128i minv = _mm_shuffle_epi8(range_min_tbl, range);
__m128i maxv = _mm_shuffle_epi8(range_max_tbl, range);
/* Check value range */
error = _mm_or_si128(error, _mm_cmplt_epi8(input, minv));
error = _mm_or_si128(error, _mm_cmpgt_epi8(input, maxv));
prev_input = input;
prev_first_len = first_len;
data += 16;
len -= 16;
};
while (len >= 16)
check_packed(_mm_loadu_si128(reinterpret_cast<const __m128i *>(data)));
/// 0 <= len <= 15 for now. Reading data from data - 1 because of right padding of 15 and left padding
/// Then zero some bytes from the unknown memory and check again.
alignas(16) char buf[32];
_mm_store_si128(reinterpret_cast<__m128i *>(buf), _mm_loadu_si128(reinterpret_cast<const __m128i *>(data - 1)));
memset(buf + len + 1, 0, 16);
check_packed(_mm_loadu_si128(reinterpret_cast<__m128i *>(buf + 1)));
/* Reduce error vector, error_reduced = 0xFFFF if error == 0 */
return _mm_movemask_epi8(_mm_cmpeq_epi8(error, _mm_set1_epi8(0))) == 0xFFFF;
}
#endif
static constexpr bool is_fixed_to_constant = false;
static void vector(const ColumnString::Chars & data, const ColumnString::Offsets & offsets, PaddedPODArray<UInt8> & res)
{
size_t size = offsets.size();
size_t prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
res[i] = isValidUTF8(data.data() + prev_offset, offsets[i] - 1 - prev_offset);
prev_offset = offsets[i];
}
}
static void vector_fixed_to_constant(const ColumnString::Chars & /*data*/, size_t /*n*/, UInt8 & /*res*/) {}
static void vector_fixed_to_vector(const ColumnString::Chars & data, size_t n, PaddedPODArray<UInt8> & res)
{
size_t size = data.size() / n;
for (size_t i = 0; i < size; ++i)
res[i] = isValidUTF8(data.data() + i * n, n);
}
static void array(const ColumnString::Offsets &, PaddedPODArray<UInt8> &)
{
throw Exception("Cannot apply function isValidUTF8 to Array argument", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
};
struct NameValidUTF8
{
static constexpr auto name = "isValidUTF8";
};
using FunctionValidUTF8 = FunctionStringOrArrayToT<ValidUTF8Impl, NameValidUTF8, UInt8>;
void registerFunctionValidUTF8(FunctionFactory & factory)
{
factory.registerFunction<FunctionValidUTF8>();
}
}

View File

@ -9,6 +9,7 @@ void registerFunctionEmpty(FunctionFactory &);
void registerFunctionNotEmpty(FunctionFactory &);
void registerFunctionLength(FunctionFactory &);
void registerFunctionLengthUTF8(FunctionFactory &);
void registerFunctionValidUTF8(FunctionFactory &);
void registerFunctionLower(FunctionFactory &);
void registerFunctionUpper(FunctionFactory &);
void registerFunctionLowerUTF8(FunctionFactory &);
@ -36,6 +37,7 @@ void registerFunctionsString(FunctionFactory & factory)
registerFunctionNotEmpty(factory);
registerFunctionLength(factory);
registerFunctionLengthUTF8(factory);
registerFunctionValidUTF8(factory);
registerFunctionLower(factory);
registerFunctionUpper(factory);
registerFunctionLowerUTF8(factory);

View File

@ -24,7 +24,8 @@ namespace DB
* Differs in that is doesn't do unneeded memset. (And also tries to do as little as possible.)
* Also allows to allocate aligned piece of memory (to use with O_DIRECT, for example).
*/
struct Memory : boost::noncopyable, Allocator<false>
template <typename Allocator = Allocator<false>>
struct Memory : boost::noncopyable, Allocator
{
/// Padding is needed to allow usage of 'memcpySmallAllowReadWriteOverflow15' function with this buffer.
static constexpr size_t pad_right = 15;
@ -136,7 +137,7 @@ template <typename Base>
class BufferWithOwnMemory : public Base
{
protected:
Memory memory;
Memory<> memory;
public:
/// If non-nullptr 'existing_memory' is passed, then buffer will not create its own memory and will use existing_memory without ownership.
BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)

View File

@ -20,8 +20,9 @@ namespace DB
struct UncompressedCacheCell
{
Memory data;
Memory<> data;
size_t compressed_size;
UInt32 additional_bytes;
};
struct UncompressedSizeWeightFunction

View File

@ -1,7 +1,7 @@
#include "DNSCacheUpdater.h"
#include <Common/DNSResolver.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ProfileEvents.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
@ -16,8 +16,6 @@ namespace ProfileEvents
namespace DB
{
using BackgroundProcessingPoolTaskInfo = BackgroundProcessingPool::TaskInfo;
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
@ -56,18 +54,15 @@ static bool isNetworkError()
DNSCacheUpdater::DNSCacheUpdater(Context & context_)
: context(context_), pool(context_.getBackgroundPool())
: context(context_), pool(context_.getSchedulePool())
{
task_handle = pool.addTask([this] () { return run(); });
task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); });
}
BackgroundProcessingPoolTaskResult DNSCacheUpdater::run()
void DNSCacheUpdater::run()
{
/// TODO: Ensusre that we get global counter (not thread local)
auto num_current_network_exceptions = ProfileEvents::global_counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed);
if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache
&& time(nullptr) > last_update_time + min_update_period_seconds)
if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache)
{
try
{
@ -77,32 +72,18 @@ BackgroundProcessingPoolTaskResult DNSCacheUpdater::run()
context.reloadClusterConfig();
last_num_network_erros = num_current_network_exceptions;
last_update_time = time(nullptr);
return BackgroundProcessingPoolTaskResult::SUCCESS;
task_handle->scheduleAfter(min_update_period_seconds * 1000);
return;
}
catch (...)
{
/// Do not increment ProfileEvents::NetworkErrors twice
if (isNetworkError())
return BackgroundProcessingPoolTaskResult::ERROR;
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/// According to BackgroundProcessingPool logic, if task has done work, it could be executed again immediately.
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
task_handle->scheduleAfter(10 * 1000);
}
DNSCacheUpdater::~DNSCacheUpdater()
{
if (task_handle)
pool.removeTask(task_handle);
task_handle.reset();
}
bool DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded()
{
if (isNetworkError())

View File

@ -4,35 +4,31 @@
#include <ctime>
#include <cstddef>
#include <Core/BackgroundSchedulePool.h>
namespace DB
{
class Context;
class BackgroundProcessingPool;
class BackgroundProcessingPoolTaskInfo;
enum class BackgroundProcessingPoolTaskResult;
/// Add a task to BackgroundProcessingPool that watch for ProfileEvents::NetworkErrors and updates DNS cache if it has increased
class DNSCacheUpdater
{
public:
explicit DNSCacheUpdater(Context & context);
~DNSCacheUpdater();
/// Checks if it is a network error and increments ProfileEvents::NetworkErrors
static bool incrementNetworkErrorEventsIfNeeded();
private:
BackgroundProcessingPoolTaskResult run();
void run();
Context & context;
BackgroundProcessingPool & pool;
std::shared_ptr<BackgroundProcessingPoolTaskInfo> task_handle;
BackgroundSchedulePool & pool;
BackgroundSchedulePoolTaskHolder task_handle;
size_t last_num_network_erros = 0;
time_t last_update_time = 0;
static constexpr size_t min_errors_to_update_cache = 3;
static constexpr time_t min_update_period_seconds = 45;

View File

@ -19,6 +19,7 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/IColumn.h>
@ -406,7 +407,7 @@ void ExpressionAnalyzer::getAggregates(const ASTPtr & ast, ExpressionActionsPtr
getRootActions(arguments[i], true, actions);
const std::string & name = arguments[i]->getColumnName();
types[i] = actions->getSampleBlock().getByName(name).type;
types[i] = recursiveRemoveLowCardinality(actions->getSampleBlock().getByName(name).type);
aggregate.argument_names[i] = name;
}
@ -974,19 +975,11 @@ void ExpressionAnalyzer::collectUsedColumns()
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(query);
NameSet required = columns_context.requiredColumns();
NameSet source_column_names;
for (const auto & column : source_columns)
source_column_names.insert(column.name);
#if 0
std::cerr << "Query: " << query << std::endl;
std::cerr << "CTX: " << columns_context << std::endl;
std::cerr << "source_columns: ";
for (const auto & name : source_columns)
std::cerr << "'" << name.name << "' ";
std::cerr << "required: ";
for (const auto & pr : required)
std::cerr << "'" << pr.first << "' ";
std::cerr << std::endl;
#endif
NameSet required = columns_context.requiredColumns();
if (columns_context.has_table_join)
{
@ -1013,10 +1006,10 @@ void ExpressionAnalyzer::collectUsedColumns()
}
}
NameSet array_join_sources;
if (columns_context.has_array_join)
{
/// Insert the columns required for the ARRAY JOIN calculation into the required columns list.
NameSet array_join_sources;
for (const auto & result_source : syntax->array_join_result_to_source)
array_join_sources.insert(result_source.second);
@ -1063,15 +1056,39 @@ void ExpressionAnalyzer::collectUsedColumns()
if (!unknown_required_source_columns.empty())
{
std::stringstream ss;
ss << "query: '" << query << "' ";
ss << columns_context;
ss << "source_columns: ";
for (const auto & name : source_columns)
ss << "'" << name.name << "' ";
ss << "Missing columns:";
for (const auto & name : unknown_required_source_columns)
ss << " '" << name << "'";
ss << " while processing query: '" << query << "'";
throw Exception("Unknown identifier: " + *unknown_required_source_columns.begin()
+ (select_query && !select_query->tables ? ". Note that there are no tables (FROM clause) in your query" : "")
+ ", context: " + ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
ss << ", required columns:";
for (const auto & name : columns_context.requiredColumns())
ss << " '" << name << "'";
if (!source_column_names.empty())
{
ss << ", source columns:";
for (const auto & name : source_column_names)
ss << " '" << name << "'";
}
else
ss << ", no source columns";
if (columns_context.has_table_join)
{
ss << ", joined columns:";
for (const auto & column : analyzedJoin().available_joined_columns)
ss << " '" << column.name_and_type.name << "'";
}
if (!array_join_sources.empty())
{
ss << ", arrayJoin columns:";
for (const auto & name : array_join_sources)
ss << " '" << name << "'";
}
throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
}
}

View File

@ -8,7 +8,6 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnVector.h>
#include <Common/LRUCache.h>
#include <Common/MemoryTracker.h>
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>

View File

@ -422,8 +422,8 @@ namespace
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted())
time_series_map = new (time_series_map) typename Map::mapped_type();
time_series_map->insert(join.getAsofType(), join.getAsofData(), asof_column, stored_block, i);
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
time_series_map->insert(join.getAsofType(), asof_column, stored_block, i);
}
};
@ -511,10 +511,7 @@ void Join::prepareBlockListStructure(Block & stored_block)
for (const auto & name : key_names_right)
{
if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back())
{
LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name);
break; // this is the last column so break is OK
}
if (!erased.count(name))
stored_block.erase(stored_block.getPositionByName(name));
@ -556,8 +553,6 @@ bool Join::insertFromBlock(const Block & block)
prepareBlockListStructure(*stored_block);
LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure());
size_t size = stored_block->columns();
/// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them.
@ -720,7 +715,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofData(), asof_column, i))
if (const RowRef * found = mapped.findAsof(join.getAsofType(), asof_column, i))
{
filter[i] = 1;
mapped.setUsed();
@ -1096,7 +1091,6 @@ void Join::joinGet(Block & block, const String & column_name) const
void Join::joinBlock(Block & block, const Names & key_names_left, const NamesAndTypesList & columns_added_by_join) const
{
std::shared_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
checkTypesOfKeys(block, key_names_left, sample_block_with_keys);

View File

@ -132,8 +132,6 @@ public:
ASTTableJoin::Kind getKind() const { return kind; }
AsofRowRefs::Type getAsofType() const { return *asof_type; }
AsofRowRefs::LookupLists & getAsofData() { return asof_lookup_lists; }
const AsofRowRefs::LookupLists & getAsofData() const { return asof_lookup_lists; }
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
@ -369,7 +367,6 @@ private:
private:
Type type = Type::EMPTY;
std::optional<AsofRowRefs::Type> asof_type;
AsofRowRefs::LookupLists asof_lookup_lists;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);

View File

@ -30,51 +30,53 @@ void callWithType(AsofRowRefs::Type which, F && f)
} // namespace
void AsofRowRefs::insert(Type type, LookupLists & lookup_data, const IColumn * asof_column, const Block * block, size_t row_num)
AsofRowRefs::AsofRowRefs(Type type)
{
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto entry = Entry<T>(key, RowRef(block, row_num));
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
{
lookup_data.lookups.push_back(Lookups());
lookup_data.lookups.back() = LookupType();
lookups = &lookup_data.lookups.back();
}
std::get<LookupType>(*lookups).insert(entry);
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
lookups = std::make_unique<LookupType>();
};
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(Type type, const LookupLists & lookup_data, const IColumn * asof_column, size_t row_num) const
void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num)
{
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupPtr = typename Entry<T>::LookupPtr;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto entry = Entry<T>(key, RowRef(block, row_num));
std::get<LookupPtr>(lookups)->insert(entry);
};
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(Type type, const IColumn * asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
using LookupPtr = typename Entry<T>::LookupPtr;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto & typed_lookup = std::get<LookupPtr>(lookups);
std::lock_guard<std::mutex> lock(lookup_data.mutex);
// The first thread that calls upper_bound ensures that the data is sorted
auto it = typed_lookup->upper_bound(Entry<T>(key));
if (!lookups)
return;
auto & typed_lookup = std::get<LookupType>(*lookups);
auto it = typed_lookup.upper_bound(Entry<T>(key));
if (it != typed_lookup.cbegin())
// cbegin() is safe to call now because the array is immutable after sorting
// hence the pointer to a entry can be returned
if (it != typed_lookup->cbegin())
out = &((--it)->row_ref);
};

View File

@ -1,12 +1,12 @@
#pragma once
#include <Columns/IColumn.h>
#include <Common/SortedLookupPODArray.h>
#include <optional>
#include <variant>
#include <list>
#include <mutex>
#include <algorithm>
namespace DB
{
@ -32,14 +32,70 @@ struct RowRefList : RowRef
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
/**
* This class is intended to push sortable data into.
* When looking up values the container ensures that it is sorted for log(N) lookup
* After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the
* references that can be returned by the lookup methods
*/
template <typename T>
class SortedLookupVector
{
public:
using Base = std::vector<T>;
// First stage, insertions into the vector
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)
{
assert(!sorted.load(std::memory_order_acquire));
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
}
// Transition into second stage, ensures that the vector is sorted
typename Base::const_iterator upper_bound(const T & k)
{
sort();
return std::upper_bound(array.cbegin(), array.cend(), k);
}
// After ensuring that the vector is sorted by calling a lookup these are safe to call
typename Base::const_iterator cbegin() const { return array.cbegin(); }
typename Base::const_iterator cend() const { return array.cend(); }
private:
std::atomic<bool> sorted = false;
Base array;
mutable std::mutex lock;
// Double checked locking with SC atomics works in C++
// https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
// The first thread that calls one of the lookup methods sorts the data
// After calling the first lookup method it is no longer allowed to insert any data
// the array becomes immutable
void sort()
{
if (!sorted.load(std::memory_order_acquire))
{
std::lock_guard<std::mutex> l(lock);
if (!sorted.load(std::memory_order_relaxed))
{
std::sort(array.begin(), array.end());
sorted.store(true, std::memory_order_release);
}
}
}
};
class AsofRowRefs
{
public:
template <typename T>
struct Entry
{
using LookupType = SortedLookupPODArray<Entry<T>>;
using LookupType = SortedLookupVector<Entry<T>>;
using LookupPtr = std::unique_ptr<LookupType>;
T asof_value;
RowRef row_ref;
@ -53,16 +109,10 @@ public:
};
using Lookups = std::variant<
Entry<UInt32>::LookupType,
Entry<UInt64>::LookupType,
Entry<Float32>::LookupType,
Entry<Float64>::LookupType>;
struct LookupLists
{
mutable std::mutex mutex;
std::list<Lookups> lookups;
};
Entry<UInt32>::LookupPtr,
Entry<UInt64>::LookupPtr,
Entry<Float32>::LookupPtr,
Entry<Float64>::LookupPtr>;
enum class Type
{
@ -72,13 +122,23 @@ public:
keyf64,
};
AsofRowRefs() {}
AsofRowRefs(Type t);
static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size);
void insert(Type type, LookupLists &, const IColumn * asof_column, const Block * block, size_t row_num);
const RowRef * findAsof(Type type, const LookupLists &, const IColumn * asof_column, size_t row_num) const;
// This will be synchronized by the rwlock mutex in Join.h
void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num);
// This will internally synchronize
const RowRef * findAsof(Type type, const IColumn * asof_column, size_t row_num) const;
private:
Lookups * lookups = nullptr;
// Lookups can be stored in a HashTable because it is memmovable
// A std::variant contains a currently active type id (memmovable), together with a union of the types
// The types are all std::unique_ptr, which contains a single pointer, which is memmovable.
// Source: https://github.com/yandex/ClickHouse/issues/4906
Lookups lookups;
};
}

View File

@ -22,7 +22,6 @@
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentThread.h>
#include <common/logger_useful.h>

View File

@ -16,7 +16,6 @@
#include <Common/ThreadPool.h>
namespace DB
{
@ -29,6 +28,8 @@ enum class BackgroundProcessingPoolTaskResult
ERROR,
NOTHING_TO_DO,
};
/** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop.
* In this case, one task can run simultaneously from different threads.
* Designed for tasks that perform continuous background work (for example, merge).
@ -45,7 +46,6 @@ public:
using TaskHandle = std::shared_ptr<TaskInfo>;
BackgroundProcessingPool(int size_);
size_t getNumberOfThreads() const

View File

@ -1,7 +1,6 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <Common/escapeForFileName.h>
#include <Common/MemoryTracker.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h>

View File

@ -1,4 +1,3 @@
#include <Common/MemoryTracker.h>
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Poco/File.h>

View File

@ -25,7 +25,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes -
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python protobuf
RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2==2.7.5 pymongo tzlocal kafka-python protobuf redis
ENV DOCKER_CHANNEL stable
ENV DOCKER_VERSION 17.09.1-ce

View File

@ -3,7 +3,7 @@
<type>loop</type>
<create_query>CREATE TABLE IF NOT EXISTS whitespaces(value String) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY tuple()</create_query>
<fill_query> INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678)</fill_query>
<fill_query>INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678)</fill_query>
<stop_conditions>
<all_of>

View File

@ -3,6 +3,12 @@
11 11
11 12
12 11
10 10
10 11 11
12 11
10 12
11 12
11 12
0
1
123 456

View File

@ -4,6 +4,17 @@ select s.a + 1 as a, s.a + 1 as b from (select 10 as a) s;
select s.a + 1 as b, s.a + 2 as a from (select 10 as a) s;
select s.a + 2 as b, s.a + 1 as a from (select 10 as a) s;
select a, a as a from (select 10 as a);
select s.a, a, a + 1 as a from (select 10 as a) as s;
select s.a + 2 as b, b - 1 as a from (select 10 as a) s;
select s.a as a, s.a + 2 as b from (select 10 as a) s;
select s.a + 1 as a, s.a + 2 as b from (select 10 as a) s;
select a + 1 as a, a + 1 as b from (select 10 as a);
select a + 1 as b, b + 1 as a from (select 10 as a); -- { serverError 174 }
select 10 as a, a + 1 as a; -- { serverError 179 }
with 10 as a select a as a; -- { serverError 179 }
with 10 as a select a + 1 as a; -- { serverError 179 }
SELECT 0 as t FROM (SELECT 1 as t) as inn WHERE inn.t = 1;
SELECT sum(value) as value FROM (SELECT 1 as value) as data WHERE data.value > 0;

View File

@ -0,0 +1,8 @@
(10,90)
(10.3,89.5)
(10,-90)
(1,1)
(nan,nan)
(0,3)
(nan,nan)
(nan,nan)

View File

@ -0,0 +1,9 @@
select arrayReduce('leastSqr', [1, 2, 3, 4], [100, 110, 120, 130]);
select arrayReduce('leastSqr', [1, 2, 3, 4], [100, 110, 120, 131]);
select arrayReduce('leastSqr', [-1, -2, -3, -4], [-100, -110, -120, -130]);
select arrayReduce('leastSqr', [5, 5.1], [6, 6.1]);
select arrayReduce('leastSqr', [0], [0]);
select arrayReduce('leastSqr', [3, 4], [3, 3]);
select arrayReduce('leastSqr', [3, 3], [3, 4]);
select arrayReduce('leastSqr', emptyArrayUInt8(), emptyArrayUInt8());

View File

@ -0,0 +1 @@
75000000

View File

@ -0,0 +1,31 @@
USE test;
DROP TABLE IF EXISTS tvs;
DROP TABLE IF EXISTS trades;
DROP TABLE IF EXISTS keys;
DROP TABLE IF EXISTS tv_times;
DROP TABLE IF EXISTS trade_times;
CREATE TABLE keys(k UInt32) ENGINE = MergeTree() ORDER BY k;
INSERT INTO keys(k) SELECT number FROM system.numbers LIMIT 5000;
CREATE TABLE tv_times(t UInt32) ENGINE = MergeTree() ORDER BY t;
INSERT INTO tv_times(t) SELECT number * 3 FROM system.numbers LIMIT 50000;
CREATE TABLE trade_times(t UInt32) ENGINE = MergeTree() ORDER BY t;
INSERT INTO trade_times(t) SELECT number * 10 FROM system.numbers LIMIT 15000;
CREATE TABLE tvs(k UInt32, t UInt32, tv UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO tvs(k,t,tv) SELECT k, t, t FROM keys CROSS JOIN tv_times;
CREATE TABLE trades(k UInt32, t UInt32, price UInt64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO trades(k,t,price) SELECT k, t, t FROM keys CROSS JOIN trade_times;
SELECT SUM(trades.price - tvs.tv) FROM trades ASOF LEFT JOIN tvs USING(k,t);
DROP TABLE tvs;
DROP TABLE trades;
DROP TABLE keys;
DROP TABLE tv_times;
DROP TABLE trade_times;

View File

@ -0,0 +1,8 @@
drop table if exists test.lc;
CREATE TABLE test.lc (`date` Date, `name` LowCardinality(Nullable(String)), `clicks` Nullable(Int32)) ENGINE = MergeTree() ORDER BY date SETTINGS index_granularity = 8192;
INSERT INTO test.lc SELECT '2019-01-01', null, 0 FROM numbers(1000000);
SELECT date, argMax(name, clicks) FROM test.lc GROUP BY date;
drop table if exists test.lc;

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS test.reserved_word_table;
CREATE TABLE test.reserved_word_table (`index` UInt8) ENGINE = MergeTree ORDER BY `index`;
DETACH TABLE test.reserved_word_table;
ATTACH TABLE test.reserved_word_table;
DROP TABLE test.reserved_word_table;

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.small_table"
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.small_table (a UInt64 default 0, n UInt64) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY (a);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.small_table(n) SELECT * from system.numbers limit 100000;"
cached_query="SELECT count() FROM test.small_table where n > 0;"
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query="$cached_query" &> /dev/null
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query_id="test-query-uncompressed-cache" --query="$cached_query" &> /dev/null
sleep 1
$CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'Seek')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'ReadCompressedBytes')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UncompressedCacheHits')] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') AND (type = 2) ORDER BY event_time DESC LIMIT 1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.small_table"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,127 @@
select 1 = isValidUTF8('') from system.numbers limit 10;
select 1 = isValidUTF8('some text') from system.numbers limit 10;
select 1 = isValidUTF8('какой-то текст') from system.numbers limit 10;
select 1 = isValidUTF8('\x00') from system.numbers limit 10;
select 1 = isValidUTF8('\x66') from system.numbers limit 10;
select 1 = isValidUTF8('\x7F') from system.numbers limit 10;
select 1 = isValidUTF8('\x00\x7F') from system.numbers limit 10;
select 1 = isValidUTF8('\x7F\x00') from system.numbers limit 10;
select 1 = isValidUTF8('\xC2\x80') from system.numbers limit 10;
select 1 = isValidUTF8('\xDF\xBF') from system.numbers limit 10;
select 1 = isValidUTF8('\xE0\xA0\x80') from system.numbers limit 10;
select 1 = isValidUTF8('\xE0\xA0\xBF') from system.numbers limit 10;
select 1 = isValidUTF8('\xED\x9F\x80') from system.numbers limit 10;
select 1 = isValidUTF8('\xEF\x80\xBF') from system.numbers limit 10;
select 1 = isValidUTF8('\xF0\x90\xBF\x80') from system.numbers limit 10;
select 1 = isValidUTF8('\xF2\x81\xBE\x99') from system.numbers limit 10;
select 1 = isValidUTF8('\xF4\x8F\x88\xAA') from system.numbers limit 10;
select 1 = isValidUTF8('a') from system.numbers limit 10;
select 1 = isValidUTF8('\xc3\xb1') from system.numbers limit 10;
select 1 = isValidUTF8('\xe2\x82\xa1') from system.numbers limit 10;
select 1 = isValidUTF8('\xf0\x90\x8c\xbc') from system.numbers limit 10;
select 1 = isValidUTF8('안녕하세요, 세상') from system.numbers limit 10;
select 0 = isValidUTF8('\xc3\x28') from system.numbers limit 10;
select 0 = isValidUTF8('\xa0\xa1') from system.numbers limit 10;
select 0 = isValidUTF8('\xe2\x28\xa1') from system.numbers limit 10;
select 0 = isValidUTF8('\xe2\x82\x28') from system.numbers limit 10;
select 0 = isValidUTF8('\xf0\x28\x8c\xbc') from system.numbers limit 10;
select 0 = isValidUTF8('\xf0\x90\x28\xbc') from system.numbers limit 10;
select 0 = isValidUTF8('\xf0\x28\x8c\x28') from system.numbers limit 10;
select 0 = isValidUTF8('\xc0\x9f') from system.numbers limit 10;
select 0 = isValidUTF8('\xf5\xff\xff\xff') from system.numbers limit 10;
select 0 = isValidUTF8('\xed\xa0\x81') from system.numbers limit 10;
select 0 = isValidUTF8('\xf8\x90\x80\x80\x80') from system.numbers limit 10;
select 0 = isValidUTF8('12345678901234\xed') from system.numbers limit 10;
select 0 = isValidUTF8('123456789012345\xed') from system.numbers limit 10;
select 0 = isValidUTF8('123456789012345\xed123456789012345\xed') from system.numbers limit 10;
select 0 = isValidUTF8('123456789012345\xf1') from system.numbers limit 10;
select 0 = isValidUTF8('123456789012345\xc2') from system.numbers limit 10;
select 0 = isValidUTF8('\xC2\x7F') from system.numbers limit 10;
select 0 = isValidUTF8('\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\xBF') from system.numbers limit 10;
select 0 = isValidUTF8('\xC0\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\xC1\x00') from system.numbers limit 10;
select 0 = isValidUTF8('\xC2\x7F') from system.numbers limit 10;
select 0 = isValidUTF8('\xDF\xC0') from system.numbers limit 10;
select 0 = isValidUTF8('\xE0\x9F\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\xE0\xC2\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\xED\xA0\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\xED\x7F\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\xEF\x80\x00') from system.numbers limit 10;
select 0 = isValidUTF8('\xF0\x8F\x80\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\xF0\xEE\x80\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\xF2\x90\x91\x7F') from system.numbers limit 10;
select 0 = isValidUTF8('\xF4\x90\x88\xAA') from system.numbers limit 10;
select 0 = isValidUTF8('\xF4\x00\xBF\xBF') from system.numbers limit 10;
select 0 = isValidUTF8('\x00\x00\x00\x00\x00\xC2\x80\x00\x00\x00\xE1\x80\x80\x00\x00\xC2\xC2\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') from system.numbers limit 10;
select 0 = isValidUTF8('\x00\x00\x00\x00\x00\xC2\xC2\x80\x00\x00\xE1\x80\x80\x00\x00\x00') from system.numbers limit 10;
select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1') from system.numbers limit 10;
select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80\xC2\x80') from system.numbers limit 10;
select 0 = isValidUTF8('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF0\x80\x80\x80') from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('some text', 9)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('какой-то текст', 27)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\x00', 1)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\x66', 1)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\x7F', 1)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\x00\x7F', 2)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\x7F\x00', 2)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xC2\x80', 2)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xDF\xBF', 2)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xE0\xA0\x80', 3)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xE0\xA0\xBF', 3)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xED\x9F\x80', 3)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xEF\x80\xBF', 3)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xF0\x90\xBF\x80', 4)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xF2\x81\xBE\x99', 4)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xF4\x8F\x88\xAA', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\x80', 1)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xBF', 1)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xC0\x80', 2)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xC1\x00', 2)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xC2\x7F', 2)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xDF\xC0', 2)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xE0\x9F\x80', 3)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xE0\xC2\x80', 3)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xED\xA0\x80', 3)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xED\x7F\x80', 3)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xEF\x80\x00', 3)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xF0\x8F\x80\x80', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xF0\xEE\x80\x80', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xF2\x90\x91\x7F', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xF4\x90\x88\xAA', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xF4\x00\xBF\xBF', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\xC2\x80\x00\x00\x00\xE1\x80\x80\x00\x00\xC2\xC2\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 32)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\xC2\xC2\x80\x00\x00\xE1\x80\x80\x00\x00\x00', 16)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80', 32)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1', 32)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80\x80', 33)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF1\x80\xC2\x80', 34)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xF0\x80\x80\x80', 35)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('a', 1)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xc3\xb1', 2)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xe2\x82\xa1', 3)) from system.numbers limit 10;
select 1 = isValidUTF8(toFixedString('\xf0\x90\x8c\xbc', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xc3\x28', 2)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xa0\xa1', 2)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xe2\x28\xa1', 3)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xe2\x82\x28', 3)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xf0\x28\x8c\xbc', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xf0\x90\x28\xbc', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xf0\x28\x8c\x28', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xc0\x9f', 2)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xf5\xff\xff\xff', 4)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xed\xa0\x81', 3)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xf8\x90\x80\x80\x80', 5)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('123456789012345\xed', 16)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('123456789012345\xf1', 16)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('123456789012345\xc2', 16)) from system.numbers limit 10;
select 0 = isValidUTF8(toFixedString('\xC2\x7F', 2)) from system.numbers limit 10;

View File

@ -32,6 +32,8 @@ Parameters:
- `daemon` — Starts `clickhouse-copier` in daemon mode.
- `config` — The path to the `zookeeper.xml` file with the parameters for the connection to ZooKeeper.
- `task-path` — The path to the ZooKeeper node. This node is used for syncing `clickhouse-copier` processes and storing tasks. Tasks are stored in `$task-path/description`.
- `task-file` — Optional path to file with task configuration for initial upload to ZooKeeper.
- `task-upload-force` — Force upload `task-file` even if node already exists.
- `base-dir` — The path to logs and auxiliary files. When it starts, `clickhouse-copier` creates `clickhouse-copier_YYYYMMHHSS_<PID>` subdirectories in `$base-dir`. If this parameter is omitted, the directories are created in the directory where `clickhouse-copier` was launched.
## Format of zookeeper.xml

View File

@ -56,6 +56,10 @@ It doesn't detect the language. So for Turkish the result might not be exactly c
If the length of the UTF-8 byte sequence is different for upper and lower case of a code point, the result may be incorrect for this code point.
If the string contains a set of bytes that is not UTF-8, then the behavior is undefined.
## isValidUTF8
Returns 1, if the set of bytes is valid UTF-8 encoded, otherwise 0.
## reverse
Reverses the string (as a sequence of bytes).

View File

@ -46,22 +46,38 @@ The FINAL modifier can be used only for a SELECT from a CollapsingMergeTree tabl
### SAMPLE Clause {#select-sample-clause}
The `SAMPLE` clause allows for approximated query processing. Approximated query processing is only supported by the tables in the `MergeTree` family, and only if the sampling expression was specified during table creation (see [MergeTree engine](../operations/table_engines/mergetree.md)).
The `SAMPLE` clause allows for approximated query processing.
When data sampling is enabled, the query is not performed on all the data, but only on a certain fraction of data (sample). For example, if you need to calculate statistics for all the visits, it is enough to execute the query on the 1/10 fraction of all the visits and then multiply the result by 10.
Approximated query processing can be useful in the following cases:
- When you have strict timing requirements (like <100ms) but you can't justify the cost of additional hardware resources to meet them.
- When your raw data is not accurate, so approximation doesn't noticeably degrade the quality.
- Business requirements target approximate results (for cost-effectiveness, or in order to market exact results to premium users).
!!! note
You can only use sampling with the tables in the [MergeTree](../operations/table_engines/mergetree.md) family, and only if the sampling expression was specified during table creation (see [MergeTree engine](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table)).
The features of data sampling are listed below:
- Data sampling is a deterministic mechanism. The result of the same `SELECT .. SAMPLE` query is always the same.
- Sampling works consistently for different tables. For tables with a single sampling key, a sample with the same coefficient always selects the same subset of possible data. For example, a sample of user IDs takes rows with the same subset of all the possible user IDs from different tables. This means that you can use the sample in subqueries in the `IN` clause, as well as manually correlate results of different queries with samples.
- Sampling works consistently for different tables. For tables with a single sampling key, a sample with the same coefficient always selects the same subset of possible data. For example, a sample of user IDs takes rows with the same subset of all the possible user IDs from different tables. This means that you can use the sample in subqueries in the [IN](#select-in-operators) clause. Also, you can join samples using the [JOIN](#select-join) clause.
- Sampling allows reading less data from a disk. Note that you must specify the sampling key correctly. For more information, see [Creating a MergeTree Table](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table).
For the `SAMPLE` clause the following syntax is supported:
- `SAMPLE k`, where `k` is a decimal number from 0 to 1. The query is executed on `k` fraction of data. For example, `SAMPLE 0.1` runs the query on 10% of data. [Read more](#select-sample-k)
- `SAMPLE n`, where `n` is a sufficiently large integer. The query is executed on a sample of at least `n` rows (but not significantly more than this). For example, `SAMPLE 10000000` runs the query on a minimum of 10,000,000 rows. [Read more](#select-sample-n)
- `SAMPLE k OFFSET m` where `k` and `m` are numbers from 0 to 1. The query is executed on a sample of `k` percent of the data. The data used for the sample is offset by `m` percent. [Read more](#select-sample-offset)
| SAMPLE&#160;Clause&#160;Syntax | Description |
| ---------------- | --------- |
| `SAMPLE k` | Here `k` is the number from 0 to 1.</br>The query is executed on `k` fraction of data. For example, `SAMPLE 0.1` runs the query on 10% of data. [Read more](#select-sample-k)|
| `SAMPLE n` | Here `n` is a sufficiently large integer.</br>The query is executed on a sample of at least `n` rows (but not significantly more than this). For example, `SAMPLE 10000000` runs the query on a minimum of 10,000,000 rows. [Read more](#select-sample-n) |
| `SAMPLE k OFFSET m` | Here `k` and `m` are the numbers from 0 to 1.</br>The query is executed on a sample of `k` fraction of the data. The data used for the sample is offset by `m` fraction. [Read more](#select-sample-offset) |
#### SAMPLE k {#select-sample-k}
Here `k` is the number from 0 to 1 (both fractional and decimal notations are supported). For example, `SAMPLE 1/2` or `SAMPLE 0.5`.
In a `SAMPLE k` clause, the sample is taken from the `k` fraction of data. The example is shown below:
``` sql
@ -76,25 +92,29 @@ GROUP BY Title
ORDER BY PageViews DESC LIMIT 1000
```
In this example, the query is executed on a sample from 0.1 (10%) of data. Values of aggregate functions are not corrected automatically, so to get an approximate result, the value 'count()' is manually multiplied by 10.
In this example, the query is executed on a sample from 0.1 (10%) of data. Values of aggregate functions are not corrected automatically, so to get an approximate result, the value `count()` is manually multiplied by 10.
#### SAMPLE n {#select-sample-n}
In this case, the query is executed on a sample of at least `n` rows, where `n` is a sufficiently large integer. For example, `SAMPLE 10000000`.
Here `n` is a sufficiently large integer. For example, `SAMPLE 10000000`.
In this case, the query is executed on a sample of at least `n` rows (but not significantly more than this). For example, `SAMPLE 10000000` runs the query on a minimum of 10,000,000 rows.
Since the minimum unit for data reading is one granule (its size is set by the `index_granularity` setting), it makes sense to set a sample that is much larger than the size of the granule.
When using the `SAMPLE n` clause, the relative coefficient is calculated dynamically. Since you do not know which relative percent of data was processed, you do not know the coefficient the aggregate functions should be multiplied by (for example, you do not know if `SAMPLE 1000000` was taken from a set of 10,000,000 rows or from a set of 1,000,000,000 rows). In this case, use the `_sample_factor` virtual column to get the approximate result.
When using the `SAMPLE n` clause, you don't know which relative percent of data was processed. So you don't know the coefficient the aggregate functions should be multiplied by. Use the `_sample_factor` virtual column to get the approximate result.
The `_sample_factor` column is where ClickHouse stores relative coefficients. This column is created automatically when you create a table with the specified sampling key. The usage example is shown below:
The `_sample_factor` column contains relative coefficients that are calculated dynamically. This column is created automatically when you [create](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table) a table with the specified sampling key. The usage examples of the `_sample_factor` column are shown below.
Let's consider the table `visits`, which contains the statistics about site visits. The first example shows how to calculate the number of page views:
``` sql
SELECT sum(Duration * _sample_factor)
SELECT sum(PageViews * _sample_factor)
FROM visits
SAMPLE 10000000
```
If you need to get the approximate count of rows in a `SELECT .. SAMPLE n` query, get the sum() of the `_sample_factor` column instead of counting the `count(*) * _sample_factor` value. For example:
The next example shows how to calculate the total number of visits:
``` sql
SELECT sum(_sample_factor)
@ -102,7 +122,7 @@ FROM visits
SAMPLE 10000000
```
Note that to calculate the average in a `SELECT .. SAMPLE n` query, you do not need to use the `_sample_factor` column:
The example below shows how to calculate the average session duration. Note that you don't need to use the relative coefficient to calculate the average values.
``` sql
SELECT avg(Duration)
@ -112,9 +132,9 @@ SAMPLE 10000000
#### SAMPLE k OFFSET m {#select-sample-offset}
You can specify the `SAMPLE k OFFSET m` clause, where `k` and `m` are numbers from 0 to 1. Examples are shown below.
Here `k` and `m` are numbers from 0 to 1. Examples are shown below.
Example 1.
**Example 1**
``` sql
SAMPLE 1/10
@ -124,13 +144,13 @@ In this example, the sample is 1/10th of all data:
`[++------------------]`
Example 2.
**Example 2**
``` sql
SAMPLE 1/10 OFFSET 1/2
```
Here, a sample of 10% is taken from the second half of data.
Here, a sample of 10% is taken from the second half of the data.
`[----------++--------]`

View File

@ -31,6 +31,8 @@ clickhouse-copier copier --daemon --config zookeeper.xml --task-path /task/path
- `daemon` - запускает `clickhouse-copier` в режиме демона.
- `config` - путь к файлу `zookeeper.xml` с параметрами соединения с ZooKeeper.
- `task-path` - путь к ноде ZooKeeper. Нода используется для синхронизации между процессами `clickhouse-copier` и для хранения заданий. Задания хранятся в `$task-path/description`.
- `task-file` - необязательный путь к файлу с описанием конфигурация заданий для загрузки в ZooKeeper.
- `task-upload-force` - Загрузить `task-file` в ZooKeeper даже если уже было загружено.
- `base-dir` - путь к логам и вспомогательным файлам. При запуске `clickhouse-copier` создает в `$base-dir` подкаталоги `clickhouse-copier_YYYYMMHHSS_<PID>`. Если параметр не указан, то каталоги будут создаваться в каталоге, где `clickhouse-copier` был запущен.
## Формат zookeeper.xml

View File

@ -38,6 +38,9 @@
Если длина UTF-8 последовательности байт различна для верхнего и нижнего регистра кодовой точки, то для этой кодовой точки, результат работы может быть некорректным.
Если строка содержит набор байт, не являющийся UTF-8, то поведение не определено.
## isValidUTF8
Возвращает 1, если набор байт является корректным в кодировке UTF-8, 0 иначе.
## reverse
Разворачивает строку (как последовательность байт).

View File

@ -45,19 +45,42 @@ SELECT [DISTINCT] expr_list
Модификатор FINAL может быть использован только при SELECT-е из таблицы типа CollapsingMergeTree. При указании FINAL, данные будут выбираться полностью "сколлапсированными". Стоит учитывать, что использование FINAL приводит к выбору кроме указанных в SELECT-е столбцов также столбцов, относящихся к первичному ключу. Также, запрос будет выполняться в один поток, и при выполнении запроса будет выполняться слияние данных. Это приводит к тому, что при использовании FINAL, запрос выполняется медленнее. В большинстве случаев, следует избегать использования FINAL. Подробнее смотрите раздел "Движок CollapsingMergeTree".
### Секция SAMPLE {#select-sample-clause}
### Секция SAMPLE
Секция `SAMPLE` позволяет выполнять запросы приближённо. Например, чтобы посчитать статистику по всем визитам, можно обработать 1/10 всех визитов и результат домножить на 10.
Секция SAMPLE позволяет выполнить запрос приближённо. Приближённое выполнение запроса поддерживается только таблицами типа MergeTree\* и только если при создании таблицы было указано выражение, по которому производится выборка (смотрите раздел "Движок MergeTree").
Сэмплирование имеет смысл, когда:
`SAMPLE` имеет вид `SAMPLE k`, где `k` - дробное число в интервале от 0 до 1, или `SAMPLE n`, где n - достаточно большое целое число.
1. Точность результата не важна, например, для оценочных расчетов.
2. Возможности аппаратной части не позволяют соответствовать строгим критериям. Например, время ответа должно быть <100 мс. При этом точность расчета имеет более низкий приоритет.
3. Точность результата участвует в бизнес-модели сервиса. Например, пользователи с бесплатной подпиской на сервис могут получать отчеты с меньшей точностью, чем пользователи с премиум подпиской.
В первом случае, запрос будет выполнен по k-доле данных. Например, если указано `SAMPLE 0.1`, то запрос будет выполнен по 10% данных.
Во втором случае, запрос будет выполнен по выборке из не более n строк. Например, если указано `SAMPLE 10000000`, то запрос будет выполнен по не более чем 10 000 000 строкам.
!!! note "Внимание"
Не стоит использовать сэмплирование в тех задачах, где важна точность расчетов. Например, при работе с финансовыми отчетами.
Пример:
Свойства сэмплирования:
``` sql
- Сэмплирование работает детерминированно. При многократном выполнении одного и того же запроса `SELECT .. SAMPLE`, результат всегда будет одинаковым.
- Сэмплирование поддерживает консистентность для разных таблиц. Имеется в виду, что для таблиц с одним и тем же ключом сэмплирования, подмножество данных в выборках будет одинаковым (выборки при этом должны быть сформированы для одинаковой доли данных). Например, выборка по идентификаторам посетителей выберет из разных таблиц строки с одинаковым подмножеством всех возможных идентификаторов. Это свойство позволяет использовать выборки в подзапросах в секции [IN](#select-in-operators), а также объединять выборки с помощью [JOIN](#select-join).
- Сэмплирование позволяет читать меньше данных с диска. Обратите внимание, для этого необходимо корректно указать ключ сэмплирования. Подробнее см. в разделе [Создание таблицы MergeTree](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table).
Сэмплирование поддерживается только таблицами семейства [MergeTree](../operations/table_engines/mergetree.md) и только в том случае, если для таблиц был указан ключ сэмплирования (выражение, на основе которого должна производиться выборка). Подробнее см. в разделе [Создание таблиц MergeTree](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table).
Выражение `SAMPLE` в запросе можно задать следующими способами:
| Способ&#160;задания&#160;SAMPLE| Описание |
| ---------------- | --------- |
| `SAMPLE k` | Здесь `k` это дробное число в интервале от 0 до 1.<br/> Запрос будет выполнен по `k` доле данных. Например, если указано `SAMPLE 1/10`, то запрос будет выполнен для выборки из 1/10 данных. [Подробнее](#select-sample-k)|
| `SAMPLE n` | Здесь `n` это достаточно большое целое число.</br> Запрос будет выполнен для выборки, состоящей из не менее чем `n` строк. Например, если указано `SAMPLE 10000000`, то запрос будет выполнен для не менее чем 10,000,000 строк. [Подробнее](#select-sample-n) |
| `SAMPLE k OFFSET m` | Здесь `k` и `m` числа от 0 до 1.</br> Запрос будет выполнен по `k` доле данных. При этом выборка будет сформирована со смещением на `m` долю. [Подробнее](#select-sample-offset) |
#### SAMPLE k {#select-sample-k}
Здесь `k` число в интервале от 0 до 1. Поддерживается как дробная, так и десятичная форма записи. Например, `SAMPLE 1/2` или `SAMPLE 0.5`.
Если задано выражение `SAMPLE k`, запрос будет выполнен для `k` доли данных. Рассмотрим пример:
```sql
SELECT
Title,
count() * 10 AS PageViews
@ -65,22 +88,76 @@ FROM hits_distributed
SAMPLE 0.1
WHERE
CounterID = 34
AND toDate(EventDate) >= toDate('2013-01-29')
AND toDate(EventDate) <= toDate('2013-02-04')
AND NOT DontCountHits
AND NOT Refresh
AND Title != ''
GROUP BY Title
ORDER BY PageViews DESC LIMIT 1000
```
В этом примере, запрос выполняется по выборке из 0.1 (10%) данных. Значения агрегатных функций не корректируются автоматически, поэтому для получения приближённого результата, значение count() вручную домножается на 10.
В этом примере запрос выполняется по выборке из 0.1 (10%) данных. Значения агрегатных функций не корректируются автоматически, поэтому чтобы получить приближённый результат, значение `count()` нужно вручную умножить на 10.
При использовании варианта вида `SAMPLE 10000000`, нет информации, какая относительная доля данных была обработана, и на что следует домножить агрегатные функции, поэтому такой способ записи подходит не для всех случаев.
Выборка с указанием относительного коэффициента является "согласованной": для таблиц с одним и тем же ключом сэмплирования, выборка с одинаковой относительной долей всегда будет составлять одно и то же подмножество данных. То есть выборка из разных таблиц, на разных серверах, в разное время, формируется одинаковым образом.
Выборка с указанием относительного коэффициента является "согласованной": если рассмотреть все возможные данные, которые могли бы быть в таблице, то выборка (при использовании одного выражения сэмплирования, указанного при создании таблицы), с одинаковым коэффициентом, выбирает всегда одно и то же подмножество этих всевозможных данных. То есть, выборка из разных таблиц, на разных серверах, в разное время, делается одинаковым образом.
#### SAMPLE n {#select-sample-n}
Например, выборка по идентификаторам посетителей, выберет из разных таблиц строки с одинаковым подмножеством всех возможных идентификаторов посетителей. Это позволяет использовать выборку в подзапросах в секции IN, а также при ручном сопоставлении результатов разных запросов с выборками.
Здесь `n` это достаточно большое целое число. Например, `SAMPLE 10000000`.
Если задано выражение `SAMPLE n`, запрос будет выполнен для выборки из не менее `n` строк (но не значительно больше этого значения). Например, если задать `SAMPLE 10000000`, в выборку попадут не менее 10,000,000 строк.
!!! note "Примечание"
Следует иметь в виду, что `n` должно быть достаточно большим числом. Так как минимальной единицей данных для чтения является одна гранула (её размер задаётся настройкой `index_granularity` для таблицы), имеет смысл создавать выборки, размер которых существенно превосходит размер гранулы.
При выполнении `SAMPLE n` коэффициент сэмплирования заранее неизвестен (то есть нет информации о том, относительно какого количества данных будет сформирована выборка). Чтобы узнать коэффициент сэмплирования, используйте столбец `_sample_factor`.
Виртуальный столбец `_sample_factor` автоматически создается в тех таблицах, для которых задано выражение `SAMPLE BY` (подробнее см. в разделе [Создание таблицы MergeTree](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table)). В столбце содержится коэффициент сэмплирования для таблицы он рассчитывается динамически по мере добавления данных в таблицу. Ниже приведены примеры использования столбца `_sample_factor`.
Предположим, у нас есть таблица, в которой ведется статистика посещений сайта. Пример ниже показывает, как рассчитать суммарное число просмотров:
```sql
SELECT sum(PageViews * _sample_factor)
FROM visits
SAMPLE 10000000
```
Следующий пример показывает, как посчитать общее число визитов:
```sql
SELECT sum(_sample_factor)
FROM visits
SAMPLE 10000000
```
В примере ниже рассчитывается среднее время на сайте. Обратите внимание, при расчете средних значений, умножать результат на коэффициент сэмплирования не нужно.
```sql
SELECT avg(Duration)
FROM visits
SAMPLE 10000000
```
#### SAMPLE k OFFSET m {#select-sample-offset}
Здесь `k` и `m` числа в интервале от 0 до 1. Например, `SAMPLE 0.1 OFFSET 0.5`. Поддерживается как дробная, так и десятичная форма записи.
При задании `SAMPLE k OFFSET m`, выборка будет сформирована из `k` доли данных со смещением на долю `m`. Примеры приведены ниже.
**Пример 1**
```sql
SAMPLE 1/10
```
В этом примере выборка будет сформирована по 1/10 доле всех данных:
`[++------------------]`
**Пример 2**
```sql
SAMPLE 1/10 OFFSET 1/2
```
Здесь выборка, которая состоит из 1/10 доли данных, взята из второй половины данных.
`[----------++--------]`
### Секция ARRAY JOIN {#select-array-join-clause}

View File

@ -44,7 +44,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
{
using namespace DB;
Memory direct_buf(block_size, sysconf(_SC_PAGESIZE));
Memory<> direct_buf(block_size, sysconf(_SC_PAGESIZE));
std::vector<char> simple_buf(block_size);
char * buf;

View File

@ -51,9 +51,9 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
AIOContext ctx;
std::vector<Memory> buffers(buffers_count);
std::vector<Memory<>> buffers(buffers_count);
for (size_t i = 0; i < buffers_count; ++i)
buffers[i] = Memory(block_size, sysconf(_SC_PAGESIZE));
buffers[i] = Memory<>(block_size, sysconf(_SC_PAGESIZE));
drand48_data rand_data;
timespec times;