Merge remote-tracking branch 'upstream/master' into fix25

This commit is contained in:
proller 2019-08-13 11:37:02 +00:00
commit dc0b313c6a
25 changed files with 929 additions and 239 deletions

View File

@ -13,7 +13,7 @@ ClickHouse is an open-source column-oriented database management system that all
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person. * You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events ## Upcoming Events
* [ClickHouse Meetup in Mountain View](https://www.eventbrite.com/e/meetup-clickhouse-in-the-south-bay-registration-65935505873) on August 13.
* [ClickHouse Meetup in Moscow](https://yandex.ru/promo/clickhouse/moscow-2019) on September 5. * [ClickHouse Meetup in Moscow](https://yandex.ru/promo/clickhouse/moscow-2019) on September 5.
* [ClickHouse Meetup in Hong Kong](https://www.meetup.com/Hong-Kong-Machine-Learning-Meetup/events/263580542/) on October 17.
* [ClickHouse Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20. * [ClickHouse Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20.
* [ClickHouse Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27. * [ClickHouse Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27.

View File

@ -26,6 +26,11 @@ void CurrentThread::updatePerformanceCounters()
current_thread->updatePerformanceCounters(); current_thread->updatePerformanceCounters();
} }
bool CurrentThread::isInitialized()
{
return current_thread;
}
ThreadStatus & CurrentThread::get() ThreadStatus & CurrentThread::get()
{ {
if (unlikely(!current_thread)) if (unlikely(!current_thread))

View File

@ -33,6 +33,9 @@ class InternalTextLogsQueue;
class CurrentThread class CurrentThread
{ {
public: public:
/// Return true in case of successful initializaiton
static bool isInitialized();
/// Handler to current thread /// Handler to current thread
static ThreadStatus & get(); static ThreadStatus & get();

View File

@ -1,3 +1,4 @@
#include <malloc.h>
#include <new> #include <new>
#include <common/config_common.h> #include <common/config_common.h>
@ -49,6 +50,11 @@ ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [
#else #else
if (size) if (size)
CurrentMemoryTracker::free(size); CurrentMemoryTracker::free(size);
#ifdef _GNU_SOURCE
/// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size.
else
CurrentMemoryTracker::free(malloc_usable_size(ptr));
#endif
#endif #endif
} }
catch (...) catch (...)

View File

@ -78,11 +78,12 @@ binary_value_info getLeadingAndTrailingBits(const T & value)
const UInt8 lz = getLeadingZeroBits(value); const UInt8 lz = getLeadingZeroBits(value);
const UInt8 tz = getTrailingZeroBits(value); const UInt8 tz = getTrailingZeroBits(value);
const UInt8 data_size = value == 0 ? 0 : static_cast<UInt8>(bit_size - lz - tz); const UInt8 data_size = value == 0 ? 0 : static_cast<UInt8>(bit_size - lz - tz);
return binary_value_info{lz, data_size, tz}; return binary_value_info{lz, data_size, tz};
} }
template <typename T> template <typename T>
UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest) UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 dest_size)
{ {
static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T)); static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T));
// -1 since there must be at least 1 non-zero bit. // -1 since there must be at least 1 non-zero bit.
@ -91,6 +92,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
if (source_size % sizeof(T) != 0) if (source_size % sizeof(T) != 0)
throw Exception("Cannot compress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_COMPRESS); throw Exception("Cannot compress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_COMPRESS);
const char * source_end = source + source_size; const char * source_end = source + source_size;
const char * dest_end = dest + dest_size;
const UInt32 items_count = source_size / sizeof(T); const UInt32 items_count = source_size / sizeof(T);
@ -110,7 +112,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
dest += sizeof(prev_value); dest += sizeof(prev_value);
} }
WriteBuffer buffer(dest, getCompressedDataSize(sizeof(T), source_size - sizeof(items_count) - sizeof(prev_value))); WriteBuffer buffer(dest, dest_end - dest);
BitWriter writer(buffer); BitWriter writer(buffer);
while (source < source_end) while (source < source_end)
@ -265,24 +267,26 @@ UInt32 CompressionCodecGorilla::doCompressData(const char * source, UInt32 sourc
dest[1] = bytes_to_skip; dest[1] = bytes_to_skip;
memcpy(&dest[2], source, bytes_to_skip); memcpy(&dest[2], source, bytes_to_skip);
size_t start_pos = 2 + bytes_to_skip; size_t start_pos = 2 + bytes_to_skip;
UInt32 compressed_size = 0; UInt32 result_size = 0;
const UInt32 compressed_size = getMaxCompressedDataSize(source_size);
switch (data_bytes_size) switch (data_bytes_size)
{ {
case 1: case 1:
compressed_size = compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); result_size = compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos], compressed_size);
break; break;
case 2: case 2:
compressed_size = compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); result_size = compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos], compressed_size);
break; break;
case 4: case 4:
compressed_size = compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); result_size = compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos], compressed_size);
break; break;
case 8: case 8:
compressed_size = compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); result_size = compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos], compressed_size);
break; break;
} }
return 1 + 1 + compressed_size; return 1 + 1 + result_size;
} }
void CompressionCodecGorilla::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 /* uncompressed_size */) const void CompressionCodecGorilla::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 /* uncompressed_size */) const

View File

@ -49,8 +49,8 @@ UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, ch
UInt8 header_size = getHeaderSize(); UInt8 header_size = getHeaderSize();
UInt32 decompressed_size = unalignedLoad<UInt32>(&source[5]); UInt32 decompressed_size = unalignedLoad<UInt32>(&source[5]);
doDecompressData(&source[header_size], source_size - header_size, dest, decompressed_size); doDecompressData(&source[header_size], source_size - header_size, dest, decompressed_size);
return decompressed_size;
return decompressed_size;
} }
UInt32 ICompressionCodec::readCompressedBlockSize(const char * source) UInt32 ICompressionCodec::readCompressedBlockSize(const char * source)

View File

@ -1,10 +1,14 @@
#include <Compression/CompressionCodecDoubleDelta.h> #include <Compression/CompressionFactory.h>
#include <Compression/CompressionCodecGorilla.h>
#include <Core/Types.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromMemory.h>
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <Core/Types.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/IParser.h>
#include <Parsers/TokenIterator.h>
#include <boost/format.hpp> #include <boost/format.hpp>
@ -20,10 +24,44 @@
#include <string.h> #include <string.h>
/// For the expansion of gtest macros.
#if defined(__clang__)
#pragma clang diagnostic ignored "-Wdeprecated"
#elif defined (__GNUC__)
#pragma GCC diagnostic ignored "-Wdeprecated-copy"
#endif
#include <gtest/gtest.h> #include <gtest/gtest.h>
using namespace DB; using namespace DB;
namespace std
{
template <typename T>
std::ostream & operator<<(std::ostream & ostr, const std::optional<T> & opt)
{
if (!opt)
{
return ostr << "<empty optional>";
}
return ostr << *opt;
}
template <typename T>
std::vector<T> operator+(std::vector<T> && left, std::vector<T> && right)
{
std::vector<T> result(std::move(left));
std::move(std::begin(right), std::end(right), std::back_inserter(result));
return result;
}
}
namespace
{
template <typename T> template <typename T>
std::string bin(const T & value, size_t bits = sizeof(T)*8) std::string bin(const T & value, size_t bits = sizeof(T)*8)
{ {
@ -37,43 +75,46 @@ std::string bin(const T & value, size_t bits = sizeof(T)*8)
template <typename T> template <typename T>
const char* type_name() const char* type_name()
{ {
#define MAKE_TYPE_NAME(TYPE) \
if constexpr (std::is_same_v<TYPE, T>) return #TYPE
MAKE_TYPE_NAME(UInt8);
MAKE_TYPE_NAME(UInt16);
MAKE_TYPE_NAME(UInt32);
MAKE_TYPE_NAME(UInt64);
MAKE_TYPE_NAME(Int8);
MAKE_TYPE_NAME(Int16);
MAKE_TYPE_NAME(Int32);
MAKE_TYPE_NAME(Int64);
MAKE_TYPE_NAME(Float32);
MAKE_TYPE_NAME(Float64);
#undef MAKE_TYPE_NAME
return typeid(T).name(); return typeid(T).name();
} }
template <> template <typename T>
const char* type_name<UInt32>() DataTypePtr makeDataType()
{ {
return "uint32"; #define MAKE_DATA_TYPE(TYPE) \
} if constexpr (std::is_same_v<T, TYPE>) return std::make_shared<DataType ## TYPE>()
template <> MAKE_DATA_TYPE(UInt8);
const char* type_name<Int32>() MAKE_DATA_TYPE(UInt16);
{ MAKE_DATA_TYPE(UInt32);
return "int32"; MAKE_DATA_TYPE(UInt64);
} MAKE_DATA_TYPE(Int8);
MAKE_DATA_TYPE(Int16);
MAKE_DATA_TYPE(Int32);
MAKE_DATA_TYPE(Int64);
MAKE_DATA_TYPE(Float32);
MAKE_DATA_TYPE(Float64);
template <> #undef MAKE_DATA_TYPE
const char* type_name<UInt64>()
{
return "uint64";
}
template <> assert(false && "unsupported size");
const char* type_name<Int64>() return nullptr;
{
return "int64";
}
template <>
const char* type_name<Float32>()
{
return "float";
}
template <>
const char* type_name<Float64>()
{
return "double";
} }
@ -135,52 +176,100 @@ template <typename T, typename ContainerLeft, typename ContainerRight>
return result; return result;
} }
struct CodecTestParam struct Codec
{ {
std::string type_name; std::string codec_statement;
std::vector<char> source_data; std::optional<double> expected_compression_ratio;
UInt8 data_byte_size;
double min_compression_ratio;
std::string case_name;
// to allow setting ratio after building with complex builder functions. explicit Codec(std::string codec_statement_, std::optional<double> expected_compression_ratio_ = std::nullopt)
CodecTestParam && setRatio(const double & ratio) && : codec_statement(std::move(codec_statement_)),
{ expected_compression_ratio(expected_compression_ratio_)
this->min_compression_ratio = ratio; {}
return std::move(*this);
} Codec()
: Codec(std::string())
{}
}; };
CodecTestParam operator+(CodecTestParam && left, CodecTestParam && right)
struct CodecTestSequence
{ {
assert(left.type_name == right.type_name); std::string name;
assert(left.data_byte_size == right.data_byte_size); std::vector<char> serialized_data;
DataTypePtr data_type;
std::vector data(std::move(left.source_data)); CodecTestSequence()
data.insert(data.end(), right.source_data.begin(), right.source_data.end()); : name(),
serialized_data(),
data_type()
{}
return CodecTestParam{ CodecTestSequence(std::string name_, std::vector<char> serialized_data_, DataTypePtr data_type_)
left.type_name, : name(name_),
std::move(data), serialized_data(serialized_data_),
left.data_byte_size, data_type(data_type_)
std::min(left.min_compression_ratio, right.min_compression_ratio), {}
left.case_name + " + " + right.case_name
CodecTestSequence(const CodecTestSequence &) = default;
CodecTestSequence & operator=(const CodecTestSequence &) = default;
CodecTestSequence(CodecTestSequence &&) = default;
CodecTestSequence & operator=(CodecTestSequence &&) = default;
};
CodecTestSequence operator+(CodecTestSequence && left, CodecTestSequence && right)
{
assert(left.data_type->equals(*right.data_type));
std::vector<char> data(std::move(left.serialized_data));
data.insert(data.end(), right.serialized_data.begin(), right.serialized_data.end());
return CodecTestSequence{
left.name + " + " + right.name,
std::move(data),
std::move(left.data_type)
}; };
} }
std::ostream & operator<<(std::ostream & ostr, const CodecTestParam & param) template <typename T>
CodecTestSequence operator*(CodecTestSequence && left, T times)
{ {
return ostr << "name: " << param.case_name std::vector<char> data(std::move(left.serialized_data));
<< "\ntype name:" << param.type_name const size_t initial_size = data.size();
<< "\nbyte size: " << static_cast<UInt32>(param.data_byte_size) const size_t final_size = initial_size * times;
<< "\ndata size: " << param.source_data.size();
data.reserve(final_size);
for (T i = 0; i < times; ++i)
{
data.insert(data.end(), data.begin(), data.begin() + initial_size);
}
return CodecTestSequence{
left.name + " x " + std::to_string(times),
std::move(data),
std::move(left.data_type)
};
} }
// compression ratio < 1.0 means that codec output is smaller than input. std::ostream & operator<<(std::ostream & ostr, const Codec & codec)
const double DEFAULT_MIN_COMPRESSION_RATIO = 1.0; {
return ostr << "Codec{"
<< "name: " << codec.codec_statement
<< ", expected_compression_ratio: " << codec.expected_compression_ratio
<< "}";
}
std::ostream & operator<<(std::ostream & ostr, const CodecTestSequence & seq)
{
return ostr << "CodecTestSequence{"
<< "name: " << seq.name
<< ", type name: " << seq.data_type->getName()
<< ", data size: " << seq.serialized_data.size() << " bytes"
<< "}";
}
template <typename T, typename... Args> template <typename T, typename... Args>
CodecTestParam makeParam(Args && ... args) CodecTestSequence makeSeq(Args && ... args)
{ {
std::initializer_list<T> vals{static_cast<T>(args)...}; std::initializer_list<T> vals{static_cast<T>(args)...};
std::vector<char> data(sizeof(T) * std::size(vals)); std::vector<char> data(sizeof(T) * std::size(vals));
@ -192,14 +281,17 @@ CodecTestParam makeParam(Args && ... args)
write_pos += sizeof(v); write_pos += sizeof(v);
} }
return CodecTestParam{type_name<T>(), std::move(data), sizeof(T), DEFAULT_MIN_COMPRESSION_RATIO, return CodecTestSequence{
(boost::format("%1% values of %2%") % std::size(vals) % type_name<T>()).str()}; (boost::format("%1% values of %2%") % std::size(vals) % type_name<T>()).str(),
std::move(data),
makeDataType<T>()
};
} }
template <typename T, size_t Begin = 1, size_t End = 10001, typename Generator> template <typename T, typename Generator>
CodecTestParam generateParam(Generator gen, const char* gen_name) CodecTestSequence generateSeq(Generator gen, const char* gen_name, size_t Begin = 0, size_t End = 10000)
{ {
static_assert (End >= Begin, "End must be not less than Begin"); assert (End >= Begin);
std::vector<char> data(sizeof(T) * (End - Begin)); std::vector<char> data(sizeof(T) * (End - Begin));
char * write_pos = data.data(); char * write_pos = data.data();
@ -211,89 +303,104 @@ CodecTestParam generateParam(Generator gen, const char* gen_name)
write_pos += sizeof(v); write_pos += sizeof(v);
} }
return CodecTestParam{type_name<T>(), std::move(data), sizeof(T), DEFAULT_MIN_COMPRESSION_RATIO, return CodecTestSequence{
(boost::format("%1% values of %2% from %3%") % (End - Begin) % type_name<T>() % gen_name).str()}; (boost::format("%1% values of %2% from %3%") % (End - Begin) % type_name<T>() % gen_name).str(),
std::move(data),
makeDataType<T>()
};
} }
void TestTranscoding(ICompressionCodec * codec, const CodecTestParam & param)
{
const auto & source_data = param.source_data;
const UInt32 encoded_max_size = codec->getCompressedReserveSize(source_data.size()); class CodecTest : public ::testing::TestWithParam<std::tuple<Codec, CodecTestSequence>>
PODArray<char> encoded(encoded_max_size);
const UInt32 encoded_size = codec->compress(source_data.data(), source_data.size(), encoded.data());
encoded.resize(encoded_size);
PODArray<char> decoded(source_data.size());
const UInt32 decoded_size = codec->decompress(encoded.data(), encoded.size(), decoded.data());
decoded.resize(decoded_size);
switch (param.data_byte_size)
{
case 1:
ASSERT_TRUE(EqualByteContainersAs<UInt8>(source_data, decoded));
break;
case 2:
ASSERT_TRUE(EqualByteContainersAs<UInt16>(source_data, decoded));
break;
case 4:
ASSERT_TRUE(EqualByteContainersAs<UInt32>(source_data, decoded));
break;
case 8:
ASSERT_TRUE(EqualByteContainersAs<UInt64>(source_data, decoded));
break;
default:
FAIL() << "Invalid data_byte_size: " << param.data_byte_size;
}
const auto header_size = codec->getHeaderSize();
const auto compression_ratio = (encoded_size - header_size) / (source_data.size() * 1.0);
ASSERT_LE(compression_ratio, param.min_compression_ratio)
<< "\n\tdecoded size: " << source_data.size()
<< "\n\tencoded size: " << encoded_size
<< "(no header: " << encoded_size - header_size << ")";
}
class CodecTest : public ::testing::TestWithParam<CodecTestParam>
{ {
public: public:
static void SetUpTestCase() enum MakeCodecParam
{ {
// To make random predicatble and avoid failing test "out of the blue". CODEC_WITH_DATA_TYPE,
srand(0); CODEC_WITHOUT_DATA_TYPE,
};
CompressionCodecPtr makeCodec(MakeCodecParam with_data_type) const
{
const auto & codec_string = std::get<0>(GetParam()).codec_statement;
const auto & data_type = with_data_type == CODEC_WITH_DATA_TYPE ? std::get<1>(GetParam()).data_type : nullptr;
const std::string codec_statement = "(" + codec_string + ")";
Tokens tokens(codec_statement.begin().base(), codec_statement.end().base());
IParser::Pos token_iterator(tokens);
Expected expected;
ASTPtr codec_ast;
ParserCodec parser;
parser.parse(token_iterator, codec_ast, expected);
return CompressionCodecFactory::instance().get(codec_ast, data_type);
}
void testTranscoding(ICompressionCodec & codec)
{
const auto & test_sequence = std::get<1>(GetParam());
const auto & source_data = test_sequence.serialized_data;
const UInt32 encoded_max_size = codec.getCompressedReserveSize(source_data.size());
PODArray<char> encoded(encoded_max_size);
const UInt32 encoded_size = codec.compress(source_data.data(), source_data.size(), encoded.data());
encoded.resize(encoded_size);
PODArray<char> decoded(source_data.size());
const UInt32 decoded_size = codec.decompress(encoded.data(), encoded.size(), decoded.data());
decoded.resize(decoded_size);
switch (test_sequence.data_type->getSizeOfValueInMemory())
{
case 1:
ASSERT_TRUE(EqualByteContainersAs<UInt8>(source_data, decoded));
break;
case 2:
ASSERT_TRUE(EqualByteContainersAs<UInt16>(source_data, decoded));
break;
case 4:
ASSERT_TRUE(EqualByteContainersAs<UInt32>(source_data, decoded));
break;
case 8:
ASSERT_TRUE(EqualByteContainersAs<UInt64>(source_data, decoded));
break;
default:
FAIL() << "Invalid test sequence data type: " << test_sequence.data_type->getName();
}
const auto header_size = codec.getHeaderSize();
const auto compression_ratio = (encoded_size - header_size) / (source_data.size() * 1.0);
const auto & codec_spec = std::get<0>(GetParam());
if (codec_spec.expected_compression_ratio)
{
ASSERT_LE(compression_ratio, *codec_spec.expected_compression_ratio)
<< "\n\tdecoded size: " << source_data.size()
<< "\n\tencoded size: " << encoded_size
<< "(no header: " << encoded_size - header_size << ")";
}
} }
}; };
TEST_P(CodecTest, DoubleDelta) TEST_P(CodecTest, TranscodingWithDataType)
{ {
auto param = GetParam(); const auto codec = makeCodec(CODEC_WITH_DATA_TYPE);
auto codec = std::make_unique<CompressionCodecDoubleDelta>(param.data_byte_size); testTranscoding(*codec);
if (param.type_name == type_name<Float32>() || param.type_name == type_name<Float64>())
{
// dd doesn't work great with many cases of integers and may result in very poor compression rate.
param.min_compression_ratio *= 1.5;
}
TestTranscoding(codec.get(), param);
} }
TEST_P(CodecTest, Gorilla) TEST_P(CodecTest, TranscodingWithoutDataType)
{ {
auto param = GetParam(); const auto codec = makeCodec(CODEC_WITHOUT_DATA_TYPE);
auto codec = std::make_unique<CompressionCodecGorilla>(param.data_byte_size); testTranscoding(*codec);
if (param.type_name == type_name<UInt32>() || param.type_name == type_name<Int32>()
|| param.type_name == type_name<UInt64>() || param.type_name == type_name<Int64>())
{
// gorilla doesn't work great with many cases of integers and may result in very poor compression rate.
param.min_compression_ratio *= 1.5;
}
TestTranscoding(codec.get(), param);
} }
///////////////////////////////////////////////////////////////////////////////////////////////////
// Here we use generators to produce test payload for codecs. // Here we use generators to produce test payload for codecs.
// Generator is a callable that should produce output value of the same type as input value. // Generator is a callable that can produce infinite number of values,
// output value MUST be of the same type input value.
///////////////////////////////////////////////////////////////////////////////////////////////////
auto SameValueGenerator = [](auto value) auto SameValueGenerator = [](auto value)
{ {
@ -332,141 +439,427 @@ auto SequentialGenerator = [](auto stride = 1)
//}; //};
template <typename T> template <typename T>
using uniform_distribution =
typename std::conditional_t<std::is_floating_point_v<T>, std::uniform_real_distribution<T>,
typename std::conditional_t<std::is_integral_v<T>, std::uniform_int_distribution<T>, void>>;
template <typename T = Int32>
struct MonotonicGenerator struct MonotonicGenerator
{ {
MonotonicGenerator(T stride_ = 1, size_t max_step_ = 10) MonotonicGenerator(T stride_ = 1, T max_step = 10)
: prev_value(0), : prev_value(0),
stride(stride_), stride(stride_),
max_step(max_step_) random_engine(0),
distribution(0, max_step)
{} {}
template <typename U> template <typename U>
U operator()(U) U operator()(U)
{ {
const U result = prev_value + static_cast<T>(stride * (rand() % max_step)); prev_value = prev_value + stride * distribution(random_engine);
return static_cast<U>(prev_value);
prev_value = result;
return result;
} }
private:
T prev_value; T prev_value;
const T stride; const T stride;
const size_t max_step; std::default_random_engine random_engine;
}; uniform_distribution<T> distribution;
auto MinMaxGenerator = [](auto i)
{
if (i % 2 == 0)
{
return std::numeric_limits<decltype(i)>::min();
}
else
{
return std::numeric_limits<decltype(i)>::max();
}
}; };
template <typename T> template <typename T>
struct RandomGenerator struct RandomGenerator
{ {
RandomGenerator(T seed = 0, T value_cap_ = std::numeric_limits<T>::max()) RandomGenerator(T seed = 0, T value_min = std::numeric_limits<T>::min(), T value_max = std::numeric_limits<T>::max())
: e(seed), : random_engine(seed),
value_cap(value_cap_) distribution(value_min, value_max)
{ {
} }
template <typename U> template <typename U>
U operator()(U i) U operator()(U)
{ {
return static_cast<decltype(i)>(distribution(e) % value_cap); return static_cast<U>(distribution(random_engine));
} }
private: private:
std::default_random_engine e; std::default_random_engine random_engine;
std::uniform_int_distribution<T> distribution; uniform_distribution<T> distribution;
const T value_cap;
}; };
auto RandomishGenerator = [](auto i) auto RandomishGenerator = [](auto i)
{ {
return static_cast<decltype(i)>(sin(static_cast<double>(i) * i) * i); return static_cast<decltype(i)>(sin(static_cast<double>(i * i)) * i);
}; };
// helper macro to produce human-friendly test case name auto MinMaxGenerator = []()
{
return [step = 0](auto i) mutable
{
if (step++ % 2 == 0)
{
return std::numeric_limits<decltype(i)>::min();
}
else
{
return std::numeric_limits<decltype(i)>::max();
}
};
};
// Fill dest value with 0x00 or 0xFF
auto FFand0Generator = []()
{
return [step = 0](auto i) mutable
{
decltype(i) result;
if (step++ % 2 == 0)
{
memset(&result, 0, sizeof(result));
}
else
{
memset(&result, 0xFF, sizeof(result));
}
return result;
};
};
// Makes many sequences with generator, first sequence length is 1, second is 2... up to `sequences_count`.
template <typename T, typename Generator>
std::vector<CodecTestSequence> generatePyramidOfSequences(const size_t sequences_count, Generator && generator, const char* generator_name)
{
std::vector<CodecTestSequence> sequences;
sequences.reserve(sequences_count);
for (size_t i = 1; i < sequences_count; ++i)
{
std::string name = generator_name + std::string(" from 0 to ") + std::to_string(i);
sequences.push_back(generateSeq<T>(std::forward<decltype(generator)>(generator), name.c_str(), 0, i));
}
return sequences;
};
// helper macro to produce human-friendly sequence name from generator
#define G(generator) generator, #generator #define G(generator) generator, #generator
const auto DefaultCodecsToTest = ::testing::Values(
Codec("DoubleDelta"),
Codec("DoubleDelta, LZ4"),
Codec("DoubleDelta, ZSTD"),
Codec("Gorilla"),
Codec("Gorilla, LZ4"),
Codec("Gorilla, ZSTD")
);
///////////////////////////////////////////////////////////////////////////////////////////////////
// test cases
///////////////////////////////////////////////////////////////////////////////////////////////////
INSTANTIATE_TEST_CASE_P(Simple,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
::testing::Values(
makeSeq<Float64>(1, 2, 3, 5, 7, 11, 13, 17, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97)
)
),
);
INSTANTIATE_TEST_CASE_P(SmallSequences,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
::testing::ValuesIn(
generatePyramidOfSequences<Int8 >(42, G(SequentialGenerator(1)))
+ generatePyramidOfSequences<Int16 >(42, G(SequentialGenerator(1)))
+ generatePyramidOfSequences<Int32 >(42, G(SequentialGenerator(1)))
+ generatePyramidOfSequences<Int64 >(42, G(SequentialGenerator(1)))
+ generatePyramidOfSequences<UInt8 >(42, G(SequentialGenerator(1)))
+ generatePyramidOfSequences<UInt16>(42, G(SequentialGenerator(1)))
+ generatePyramidOfSequences<UInt32>(42, G(SequentialGenerator(1)))
+ generatePyramidOfSequences<UInt64>(42, G(SequentialGenerator(1)))
)
),
);
INSTANTIATE_TEST_CASE_P(Mixed, INSTANTIATE_TEST_CASE_P(Mixed,
CodecTest, CodecTest,
::testing::Values( ::testing::Combine(
generateParam<Int32, 1, 3>(G(MinMaxGenerator)) + generateParam<Int32, 1, 11>(G(SequentialGenerator(1))).setRatio(1), DefaultCodecsToTest,
generateParam<UInt32, 1, 3>(G(MinMaxGenerator)) + generateParam<UInt32, 1, 11>(G(SequentialGenerator(1))).setRatio(1), ::testing::Values(
generateParam<Int64, 1, 3>(G(MinMaxGenerator)) + generateParam<Int64, 1, 11>(G(SequentialGenerator(1))).setRatio(1), generateSeq<Int8>(G(MinMaxGenerator()), 1, 5) + generateSeq<Int8>(G(SequentialGenerator(1)), 1, 1001),
generateParam<UInt64, 1, 3>(G(MinMaxGenerator)) + generateParam<UInt64, 1, 11>(G(SequentialGenerator(1))).setRatio(1) generateSeq<Int16>(G(MinMaxGenerator()), 1, 5) + generateSeq<Int16>(G(SequentialGenerator(1)), 1, 1001),
generateSeq<Int32>(G(MinMaxGenerator()), 1, 5) + generateSeq<Int32>(G(SequentialGenerator(1)), 1, 1001),
generateSeq<Int64>(G(MinMaxGenerator()), 1, 5) + generateSeq<Int64>(G(SequentialGenerator(1)), 1, 1001),
generateSeq<UInt8>(G(MinMaxGenerator()), 1, 5) + generateSeq<UInt8>(G(SequentialGenerator(1)), 1, 1001),
generateSeq<UInt16>(G(MinMaxGenerator()), 1, 5) + generateSeq<UInt16>(G(SequentialGenerator(1)), 1, 1001),
generateSeq<UInt32>(G(MinMaxGenerator()), 1, 5) + generateSeq<UInt32>(G(SequentialGenerator(1)), 1, 1001),
generateSeq<UInt64>(G(MinMaxGenerator()), 1, 5) + generateSeq<UInt64>(G(SequentialGenerator(1)), 1, 1001)
)
), ),
); );
INSTANTIATE_TEST_CASE_P(Same, INSTANTIATE_TEST_CASE_P(SameValueInt,
CodecTest, CodecTest,
::testing::Values( ::testing::Combine(
generateParam<UInt32>(G(SameValueGenerator(1000))), DefaultCodecsToTest,
generateParam<Int32>(G(SameValueGenerator(-1000))), ::testing::Values(
generateParam<UInt64>(G(SameValueGenerator(1000))), generateSeq<Int8 >(G(SameValueGenerator(1000))),
generateParam<Int64>(G(SameValueGenerator(-1000))), generateSeq<Int16 >(G(SameValueGenerator(1000))),
generateParam<Float32>(G(SameValueGenerator(M_E))), generateSeq<Int32 >(G(SameValueGenerator(1000))),
generateParam<Float64>(G(SameValueGenerator(M_E))) generateSeq<Int64 >(G(SameValueGenerator(1000))),
generateSeq<UInt8 >(G(SameValueGenerator(1000))),
generateSeq<UInt16>(G(SameValueGenerator(1000))),
generateSeq<UInt32>(G(SameValueGenerator(1000))),
generateSeq<UInt64>(G(SameValueGenerator(1000)))
)
), ),
); );
INSTANTIATE_TEST_CASE_P(Sequential, INSTANTIATE_TEST_CASE_P(SameNegativeValueInt,
CodecTest, CodecTest,
::testing::Values( ::testing::Combine(
generateParam<UInt32>(G(SequentialGenerator(1))), DefaultCodecsToTest,
generateParam<Int32>(G(SequentialGenerator(-1))), ::testing::Values(
generateParam<UInt64>(G(SequentialGenerator(1))), generateSeq<Int8 >(G(SameValueGenerator(-1000))),
generateParam<Int64>(G(SequentialGenerator(-1))), generateSeq<Int16 >(G(SameValueGenerator(-1000))),
generateParam<Float32>(G(SequentialGenerator(M_E))), generateSeq<Int32 >(G(SameValueGenerator(-1000))),
generateParam<Float64>(G(SequentialGenerator(M_E))) generateSeq<Int64 >(G(SameValueGenerator(-1000))),
generateSeq<UInt8 >(G(SameValueGenerator(-1000))),
generateSeq<UInt16>(G(SameValueGenerator(-1000))),
generateSeq<UInt32>(G(SameValueGenerator(-1000))),
generateSeq<UInt64>(G(SameValueGenerator(-1000)))
)
), ),
); );
INSTANTIATE_TEST_CASE_P(Monotonic, INSTANTIATE_TEST_CASE_P(SameValueFloat,
CodecTest, CodecTest,
::testing::Values( ::testing::Combine(
generateParam<UInt32>(G(MonotonicGenerator<UInt32>(1, 5))), ::testing::Values(
generateParam<Int32>(G(MonotonicGenerator<Int32>(-1, 5))), Codec("Gorilla"),
generateParam<UInt64>(G(MonotonicGenerator<UInt64>(1, 5))), Codec("Gorilla, LZ4")
generateParam<Int64>(G(MonotonicGenerator<Int64>(-1, 5))), ),
generateParam<Float32>(G(MonotonicGenerator<Float32>(M_E, 5))), ::testing::Values(
generateParam<Float64>(G(MonotonicGenerator<Float64>(M_E, 5))) generateSeq<Float32>(G(SameValueGenerator(M_E))),
generateSeq<Float64>(G(SameValueGenerator(M_E)))
)
), ),
); );
INSTANTIATE_TEST_CASE_P(Random, INSTANTIATE_TEST_CASE_P(SameNegativeValueFloat,
CodecTest, CodecTest,
::testing::Values( ::testing::Combine(
generateParam<UInt32>(G(RandomGenerator<UInt32>(0, 1000'000'000))).setRatio(1.2), ::testing::Values(
generateParam<UInt64>(G(RandomGenerator<UInt64>(0, 1000'000'000))).setRatio(1.1) Codec("Gorilla"),
Codec("Gorilla, LZ4")
),
::testing::Values(
generateSeq<Float32>(G(SameValueGenerator(-1 * M_E))),
generateSeq<Float64>(G(SameValueGenerator(-1 * M_E)))
)
), ),
); );
INSTANTIATE_TEST_CASE_P(Randomish, INSTANTIATE_TEST_CASE_P(SequentialInt,
CodecTest, CodecTest,
::testing::Values( ::testing::Combine(
generateParam<Int32>(G(RandomishGenerator)).setRatio(1.1), DefaultCodecsToTest,
generateParam<Int64>(G(RandomishGenerator)).setRatio(1.1), ::testing::Values(
generateParam<UInt32>(G(RandomishGenerator)).setRatio(1.1), generateSeq<Int8 >(G(SequentialGenerator(1))),
generateParam<UInt64>(G(RandomishGenerator)).setRatio(1.1), generateSeq<Int16 >(G(SequentialGenerator(1))),
generateParam<Float32>(G(RandomishGenerator)).setRatio(1.1), generateSeq<Int32 >(G(SequentialGenerator(1))),
generateParam<Float64>(G(RandomishGenerator)).setRatio(1.1) generateSeq<Int64 >(G(SequentialGenerator(1))),
generateSeq<UInt8 >(G(SequentialGenerator(1))),
generateSeq<UInt16>(G(SequentialGenerator(1))),
generateSeq<UInt32>(G(SequentialGenerator(1))),
generateSeq<UInt64>(G(SequentialGenerator(1)))
)
), ),
); );
INSTANTIATE_TEST_CASE_P(Overflow, // -1, -2, -3, ... etc for signed
// 0xFF, 0xFE, 0xFD, ... for unsigned
INSTANTIATE_TEST_CASE_P(SequentialReverseInt,
CodecTest, CodecTest,
::testing::Values( ::testing::Combine(
generateParam<UInt32>(G(MinMaxGenerator)), DefaultCodecsToTest,
generateParam<Int32>(G(MinMaxGenerator)), ::testing::Values(
generateParam<UInt64>(G(MinMaxGenerator)), generateSeq<Int8 >(G(SequentialGenerator(-1))),
generateParam<Int64>(G(MinMaxGenerator)) generateSeq<Int16 >(G(SequentialGenerator(-1))),
generateSeq<Int32 >(G(SequentialGenerator(-1))),
generateSeq<Int64 >(G(SequentialGenerator(-1))),
generateSeq<UInt8 >(G(SequentialGenerator(-1))),
generateSeq<UInt16>(G(SequentialGenerator(-1))),
generateSeq<UInt32>(G(SequentialGenerator(-1))),
generateSeq<UInt64>(G(SequentialGenerator(-1)))
)
), ),
); );
INSTANTIATE_TEST_CASE_P(SequentialFloat,
CodecTest,
::testing::Combine(
::testing::Values(
Codec("Gorilla"),
Codec("Gorilla, LZ4")
),
::testing::Values(
generateSeq<Float32>(G(SequentialGenerator(M_E))),
generateSeq<Float64>(G(SequentialGenerator(M_E)))
)
),
);
INSTANTIATE_TEST_CASE_P(SequentialReverseFloat,
CodecTest,
::testing::Combine(
::testing::Values(
Codec("Gorilla"),
Codec("Gorilla, LZ4")
),
::testing::Values(
generateSeq<Float32>(G(SequentialGenerator(-1 * M_E))),
generateSeq<Float64>(G(SequentialGenerator(-1 * M_E)))
)
),
);
INSTANTIATE_TEST_CASE_P(MonotonicInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
::testing::Values(
generateSeq<Int8 >(G(MonotonicGenerator(1, 5))),
generateSeq<Int16 >(G(MonotonicGenerator(1, 5))),
generateSeq<Int32 >(G(MonotonicGenerator(1, 5))),
generateSeq<Int64 >(G(MonotonicGenerator(1, 5))),
generateSeq<UInt8 >(G(MonotonicGenerator(1, 5))),
generateSeq<UInt16>(G(MonotonicGenerator(1, 5))),
generateSeq<UInt32>(G(MonotonicGenerator(1, 5))),
generateSeq<UInt64>(G(MonotonicGenerator(1, 5)))
)
),
);
INSTANTIATE_TEST_CASE_P(MonotonicReverseInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
::testing::Values(
generateSeq<Int8 >(G(MonotonicGenerator(-1, 5))),
generateSeq<Int16 >(G(MonotonicGenerator(-1, 5))),
generateSeq<Int32 >(G(MonotonicGenerator(-1, 5))),
generateSeq<Int64 >(G(MonotonicGenerator(-1, 5))),
generateSeq<UInt8 >(G(MonotonicGenerator(-1, 5))),
generateSeq<UInt16>(G(MonotonicGenerator(-1, 5))),
generateSeq<UInt32>(G(MonotonicGenerator(-1, 5))),
generateSeq<UInt64>(G(MonotonicGenerator(-1, 5)))
)
),
);
INSTANTIATE_TEST_CASE_P(MonotonicFloat,
CodecTest,
::testing::Combine(
::testing::Values(
Codec("Gorilla")
),
::testing::Values(
generateSeq<Float32>(G(MonotonicGenerator<Float32>(M_E, 5))),
generateSeq<Float64>(G(MonotonicGenerator<Float64>(M_E, 5)))
)
),
);
INSTANTIATE_TEST_CASE_P(MonotonicReverseFloat,
CodecTest,
::testing::Combine(
::testing::Values(
Codec("Gorilla")
),
::testing::Values(
generateSeq<Float32>(G(MonotonicGenerator<Float32>(-1 * M_E, 5))),
generateSeq<Float64>(G(MonotonicGenerator<Float64>(-1 * M_E, 5)))
)
),
);
INSTANTIATE_TEST_CASE_P(RandomInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
::testing::Values(
generateSeq<UInt8 >(G(RandomGenerator<UInt8>(0))),
generateSeq<UInt16>(G(RandomGenerator<UInt16>(0))),
generateSeq<UInt32>(G(RandomGenerator<UInt32>(0, 0, 1000'000'000))),
generateSeq<UInt64>(G(RandomGenerator<UInt64>(0, 0, 1000'000'000)))
)
),
);
INSTANTIATE_TEST_CASE_P(RandomishInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
::testing::Values(
generateSeq<Int32>(G(RandomishGenerator)),
generateSeq<Int64>(G(RandomishGenerator)),
generateSeq<UInt32>(G(RandomishGenerator)),
generateSeq<UInt64>(G(RandomishGenerator)),
generateSeq<Float32>(G(RandomishGenerator)),
generateSeq<Float64>(G(RandomishGenerator))
)
),
);
INSTANTIATE_TEST_CASE_P(RandomishFloat,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
::testing::Values(
generateSeq<Float32>(G(RandomishGenerator)),
generateSeq<Float64>(G(RandomishGenerator))
)
),
);
// Double delta overflow case, deltas are out of bounds for target type
INSTANTIATE_TEST_CASE_P(OverflowInt,
CodecTest,
::testing::Combine(
::testing::Values(
Codec("DoubleDelta", 1.2),
Codec("DoubleDelta, LZ4", 1.0)
),
::testing::Values(
generateSeq<UInt32>(G(MinMaxGenerator())),
generateSeq<Int32>(G(MinMaxGenerator())),
generateSeq<UInt64>(G(MinMaxGenerator())),
generateSeq<Int64>(G(MinMaxGenerator()))
)
),
);
INSTANTIATE_TEST_CASE_P(OverflowFloat,
CodecTest,
::testing::Combine(
::testing::Values(
Codec("Gorilla", 1.1),
Codec("Gorilla, LZ4", 1.0)
),
::testing::Values(
generateSeq<Float32>(G(MinMaxGenerator())),
generateSeq<Float64>(G(MinMaxGenerator())),
generateSeq<Float32>(G(FFand0Generator())),
generateSeq<Float64>(G(FFand0Generator()))
)
),
);
}

View File

@ -63,9 +63,9 @@ void ComplexKeyCacheDictionary::setAttributeValue(Attribute & attribute, const s
const auto str_size = string.size(); const auto str_size = string.size();
if (str_size != 0) if (str_size != 0)
{ {
auto string_ptr = string_arena->alloc(str_size + 1); auto str_ptr = string_arena->alloc(str_size);
std::copy(string.data(), string.data() + str_size + 1, string_ptr); std::copy(string.data(), string.data() + str_size, str_ptr);
string_ref = StringRef{string_ptr, str_size}; string_ref = StringRef{str_ptr, str_size};
} }
else else
string_ref = {}; string_ref = {};

View File

@ -43,7 +43,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{ {
/// nullIf(col1, col2) == if(col1 == col2, NULL, col1) /// nullIf(col1, col2) == if(col1 != col2, col1, NULL)
Block temp_block = block; Block temp_block = block;
@ -51,7 +51,7 @@ public:
temp_block.insert({nullptr, std::make_shared<DataTypeUInt8>(), ""}); temp_block.insert({nullptr, std::make_shared<DataTypeUInt8>(), ""});
{ {
auto equals_func = FunctionFactory::instance().get("equals", context)->build( auto equals_func = FunctionFactory::instance().get("notEquals", context)->build(
{temp_block.getByPosition(arguments[0]), temp_block.getByPosition(arguments[1])}); {temp_block.getByPosition(arguments[0]), temp_block.getByPosition(arguments[1])});
equals_func->execute(temp_block, {arguments[0], arguments[1]}, res_pos, input_rows_count); equals_func->execute(temp_block, {arguments[0], arguments[1]}, res_pos, input_rows_count);
} }
@ -69,7 +69,7 @@ public:
auto func_if = FunctionFactory::instance().get("if", context)->build( auto func_if = FunctionFactory::instance().get("if", context)->build(
{temp_block.getByPosition(res_pos), temp_block.getByPosition(null_pos), temp_block.getByPosition(arguments[0])}); {temp_block.getByPosition(res_pos), temp_block.getByPosition(null_pos), temp_block.getByPosition(arguments[0])});
func_if->execute(temp_block, {res_pos, null_pos, arguments[0]}, result, input_rows_count); func_if->execute(temp_block, {res_pos, arguments[0], null_pos}, result, input_rows_count);
block.getByPosition(result).column = std::move(temp_block.getByPosition(result).column); block.getByPosition(result).column = std::move(temp_block.getByPosition(result).column);
} }

View File

@ -39,7 +39,7 @@ Block TextLogElement::createBlock()
{std::move(priority_datatype), "level"}, {std::move(priority_datatype), "level"},
{std::make_shared<DataTypeString>(), "query_id"}, {std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeString>(), "logger_name"}, {std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "logger_name"},
{std::make_shared<DataTypeString>(), "message"}, {std::make_shared<DataTypeString>(), "message"},
{std::make_shared<DataTypeUInt32>(), "revision"}, {std::make_shared<DataTypeUInt32>(), "revision"},

View File

@ -576,7 +576,9 @@ public:
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0; virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
bool canUseAdaptiveGranularity() const /// Returns true if table can create new parts with adaptive granularity
/// Has additional constraint in replicated version
virtual bool canUseAdaptiveGranularity() const
{ {
return settings.index_granularity_bytes != 0 && return settings.index_granularity_bytes != 0 &&
(settings.enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts); (settings.enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
@ -632,7 +634,7 @@ public:
String sampling_expr_column_name; String sampling_expr_column_name;
Names columns_required_for_sampling; Names columns_required_for_sampling;
const MergeTreeSettings settings; MergeTreeSettings settings;
/// Limiting parallel sends per one table, used in DataPartsExchange /// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0}; std::atomic_uint current_table_sends {0};

View File

@ -297,6 +297,17 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
} }
createNewZooKeeperNodes(); createNewZooKeeperNodes();
other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
}
bool StorageReplicatedMergeTree::checkFixedGranualrityInZookeeper()
{
auto zookeeper = getZooKeeper();
String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
return metadata_from_zk.index_granularity_bytes == 0;
} }
@ -5143,4 +5154,12 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, const C
return results; return results;
} }
bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
{
return settings.index_granularity_bytes != 0 &&
(settings.enable_mixed_granularity_parts ||
(!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity));
}
} }

View File

@ -172,6 +172,9 @@ public:
CheckResults checkData(const ASTPtr & query, const Context & context) override; CheckResults checkData(const ASTPtr & query, const Context & context) override;
/// Checks ability to use granularity
bool canUseAdaptiveGranularity() const override;
private: private:
/// Delete old parts from disk and from ZooKeeper. /// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK(); void clearOldPartsAndRemoveFromZK();
@ -285,6 +288,9 @@ private:
/// An event that awakens `alter` method from waiting for the completion of the ALTER query. /// An event that awakens `alter` method from waiting for the completion of the ALTER query.
zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>(); zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>();
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
/** Creates the minimum set of nodes in ZooKeeper. /** Creates the minimum set of nodes in ZooKeeper.
*/ */
void createTableIfNotExists(); void createTableIfNotExists();
@ -506,6 +512,10 @@ private:
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context); void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context);
void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context); void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context);
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed
bool checkFixedGranualrityInZookeeper();
protected: protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/ */

View File

@ -0,0 +1,97 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from helpers.client import QueryRuntimeException, QueryTimeoutExceedException
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server:19.1.14', with_installed_binary=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
time.sleep(1)
yield cluster
finally:
cluster.shutdown()
def test_creating_table_different_setting(start_cluster):
node1.query("CREATE TABLE t1 (c1 String, c2 String) ENGINE=ReplicatedMergeTree('/clickhouse/t1', '1') ORDER BY tuple(c1) SETTINGS index_granularity_bytes = 0")
node1.query("INSERT INTO t1 VALUES('x', 'y')")
node2.query("CREATE TABLE t1 (c1 String, c2 String) ENGINE=ReplicatedMergeTree('/clickhouse/t1', '2') ORDER BY tuple(c1)")
node1.query("INSERT INTO t1 VALUES('a', 'b')")
node2.query("SYSTEM SYNC REPLICA t1", timeout=5)
node1.query("SELECT count() FROM t1") == "2\n"
node2.query("SELECT count() FROM t1") == "1\n"
node2.query("INSERT INTO t1 VALUES('c', 'd')")
node1.query("SYSTEM SYNC REPLICA t1", timeout=5)
# replication works
node1.query("SELECT count() FROM t1") == "3\n"
node2.query("SELECT count() FROM t1") == "2\n"
# OPTIMIZE also works correctly
node2.query("OPTIMIZE TABLE t1 FINAL") == "3\n"
node1.query("SYSTEM SYNC REPLICA t1", timeout=5)
node1.query("SELECT count() FROM t1") == "3\n"
node2.query("SELECT count() FROM t1") == "2\n"
path_part = node1.query("SELECT path FROM system.parts WHERE table = 't1' AND active=1 ORDER BY partition DESC LIMIT 1").strip()
with pytest.raises(Exception): # check that we have no adaptive files
node1.exec_in_container(["bash", "-c", "find {p} -name '*.mrk2' | grep '.*'".format(p=path_part)])
path_part = node2.query("SELECT path FROM system.parts WHERE table = 't1' AND active=1 ORDER BY partition DESC LIMIT 1").strip()
with pytest.raises(Exception): # check that we have no adaptive files
node2.exec_in_container(["bash", "-c", "find {p} -name '*.mrk2' | grep '.*'".format(p=path_part)])
def test_old_node_with_new_node(start_cluster):
node3.query("CREATE TABLE t2 (c1 String, c2 String) ENGINE=ReplicatedMergeTree('/clickhouse/t2', '3') ORDER BY tuple(c1)")
node3.query("INSERT INTO t2 VALUES('x', 'y')")
node2.query("CREATE TABLE t2 (c1 String, c2 String) ENGINE=ReplicatedMergeTree('/clickhouse/t2', '2') ORDER BY tuple(c1)")
node3.query("INSERT INTO t2 VALUES('a', 'b')")
node2.query("SYSTEM SYNC REPLICA t2", timeout=5)
node3.query("SELECT count() FROM t2") == "2\n"
node2.query("SELECT count() FROM t2") == "1\n"
node2.query("INSERT INTO t2 VALUES('c', 'd')")
node3.query("SYSTEM SYNC REPLICA t2", timeout=5)
# replication works
node3.query("SELECT count() FROM t2") == "3\n"
node2.query("SELECT count() FROM t2") == "2\n"
# OPTIMIZE also works correctly
node3.query("OPTIMIZE table t2 FINAL")
node2.query("SYSTEM SYNC REPLICA t2", timeout=5)
node3.query("SELECT count() FROM t2") == "3\n"
node2.query("SELECT count() FROM t2") == "2\n"
path_part = node3.query("SELECT path FROM system.parts WHERE table = 't2' AND active=1 ORDER BY partition DESC LIMIT 1").strip()
with pytest.raises(Exception): # check that we have no adaptive files
node3.exec_in_container(["bash", "-c", "find {p} -name '*.mrk2' | grep '.*'".format(p=path_part)])
path_part = node2.query("SELECT path FROM system.parts WHERE table = 't2' AND active=1 ORDER BY partition DESC LIMIT 1").strip()
with pytest.raises(Exception): # check that we have no adaptive files
node2.exec_in_container(["bash", "-c", "find {p} -name '*.mrk2' | grep '.*'".format(p=path_part)])

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,45 @@
<yandex>
<dictionary>
<name>radars</name>
<source>
<clickhouse>
<host>localhost</host>
<port>9000</port>
<user>default</user>
<password></password>
<db>default</db>
<table>radars_table</table>
</clickhouse>
</source>
<structure>
<key>
<attribute>
<name>radar_id</name>
<type>String</type>
<hierarchical>False</hierarchical>
<injective>False</injective>
</attribute>
</key>
<attribute>
<name>radar_ip</name>
<type>String</type>
<null_value></null_value>
<hierarchical>False</hierarchical>
<injective>True</injective>
</attribute>
<attribute>
<name>client_id</name>
<type>String</type>
<null_value></null_value>
<hierarchical>False</hierarchical>
<injective>True</injective>
</attribute>
</structure>
<layout>
<complex_key_cache>
<size_in_cells>20</size_in_cells>
</complex_key_cache>
</layout>
<lifetime>1</lifetime>
</dictionary>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,51 @@
import pytest
import os
import time
from helpers.cluster import ClickHouseCluster
import random
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=['configs/dictionaries/complex_key_cache_string.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node.query("create table radars_table (radar_id String, radar_ip String, client_id String) engine=MergeTree() order by radar_id")
yield cluster
finally:
cluster.shutdown()
def test_memory_consumption(started_cluster):
node.query("insert into radars_table select toString(rand() % 5000), '{0}', '{0}' from numbers(1000)".format('w' * 8))
node.query("insert into radars_table select toString(rand() % 5000), '{0}', '{0}' from numbers(1000)".format('x' * 16))
node.query("insert into radars_table select toString(rand() % 5000), '{0}', '{0}' from numbers(1000)".format('y' * 32))
node.query("insert into radars_table select toString(rand() % 5000), '{0}', '{0}' from numbers(1000)".format('z' * 64))
# Fill dictionary
node.query("select dictGetString('radars', 'client_id', tuple(toString(number))) from numbers(0, 5000)")
allocated_first = int(node.query("select bytes_allocated from system.dictionaries where name = 'radars'").strip())
alloc_array = []
for i in xrange(5):
node.query("select dictGetString('radars', 'client_id', tuple(toString(number))) from numbers(0, 5000)")
allocated = int(node.query("select bytes_allocated from system.dictionaries where name = 'radars'").strip())
alloc_array.append(allocated)
# size doesn't grow
assert all(allocated_first >= a for a in alloc_array)
for i in xrange(5):
node.query("select dictGetString('radars', 'client_id', tuple(toString(number))) from numbers(0, 5000)")
allocated = int(node.query("select bytes_allocated from system.dictionaries where name = 'radars'").strip())
alloc_array.append(allocated)
# size doesn't grow
assert all(allocated_first >= a for a in alloc_array)

View File

@ -70,6 +70,7 @@
42 42 \N 42 42 \N
\N 6 \N \N 6 \N
\N \N \N \N \N \N
1
----- coalesce ----- ----- coalesce -----
\N \N
1 1

View File

@ -125,6 +125,7 @@ SELECT '----- ifNull, nullIf -----';
SELECT col1, col2, ifNull(col1,col2) FROM test1_00395 ORDER BY col1,col2 ASC; SELECT col1, col2, ifNull(col1,col2) FROM test1_00395 ORDER BY col1,col2 ASC;
SELECT col1, col2, nullIf(col1,col2) FROM test1_00395 ORDER BY col1,col2 ASC; SELECT col1, col2, nullIf(col1,col2) FROM test1_00395 ORDER BY col1,col2 ASC;
SELECT nullIf(1, NULL);
SELECT '----- coalesce -----'; SELECT '----- coalesce -----';

View File

@ -23,6 +23,8 @@ def concatenate(lang, docs_path, single_page_file):
logging.debug('Concatenating: ' + ', '.join(files_to_concatenate)) logging.debug('Concatenating: ' + ', '.join(files_to_concatenate))
for path in files_to_concatenate: for path in files_to_concatenate:
if path.endswith('introduction/info.md'):
continue
try: try:
with open(os.path.join(lang_path, path)) as f: with open(os.path.join(lang_path, path)) as f:
anchors = set() anchors = set()

View File

@ -58,13 +58,11 @@ void OwnSplitChannel::log(const Poco::Message & msg)
elem.thread_name = getThreadName(); elem.thread_name = getThreadName();
elem.thread_number = msg_ext.thread_number; elem.thread_number = msg_ext.thread_number;
try
{ if (CurrentThread::isInitialized())
elem.os_thread_id = CurrentThread::get().os_thread_id; elem.os_thread_id = CurrentThread::get().os_thread_id;
} catch (...) else
{
elem.os_thread_id = 0; elem.os_thread_id = 0;
}
elem.query_id = msg_ext.query_id; elem.query_id = msg_ext.query_id;

View File

@ -94,7 +94,7 @@
</div> </div>
<div id="announcement" class="colored-block"> <div id="announcement" class="colored-block">
<div class="page"> <div class="page">
Upcoming Meetups: <a class="announcement-link" href="https://www.eventbrite.com/e/meetup-clickhouse-in-the-south-bay-registration-65935505873" rel="external nofollow" target="_blank">Mountain View</a> on August 13, <a class="announcement-link" href="https://yandex.ru/promo/clickhouse/moscow-2019" rel="external nofollow" target="_blank">Moscow</a> on September 5, <a class="announcement-link" href="https://www.huodongxing.com/event/3483759917300" rel="external nofollow" target="_blank">Shenzhen</a> on October 20 and <a class="announcement-link" href="https://www.huodongxing.com/event/4483760336000" rel="external nofollow" target="_blank">Shanghai</a> on October 27 Upcoming Meetups: <a class="announcement-link" href="https://yandex.ru/promo/clickhouse/moscow-2019" rel="external nofollow" target="_blank">Moscow</a> on September 5, <a class="announcement-link" href="https://www.meetup.com/Hong-Kong-Machine-Learning-Meetup/events/263580542/" rel="external nofollow" target="_blank">Hong Kong</a> on October 17, <a class="announcement-link" href="https://www.huodongxing.com/event/3483759917300" rel="external nofollow" target="_blank">Shenzhen</a> on October 20 and <a class="announcement-link" href="https://www.huodongxing.com/event/4483760336000" rel="external nofollow" target="_blank">Shanghai</a> on October 27
</div> </div>
</div> </div>
<div class="page"> <div class="page">