Merge branch 'master' into CLICKHOUSE-2931

This commit is contained in:
alexey-milovidov 2017-08-31 00:23:39 +03:00 committed by GitHub
commit 5d14855989
87 changed files with 887 additions and 325 deletions

3
.gitmodules vendored
View File

@ -1,3 +0,0 @@
[submodule "doc/presentations"]
path = website/presentations
url = https://github.com/yandex/clickhouse-presentations.git

View File

@ -1,3 +1,7 @@
# ClickHouse release 1.1.54284
* This is bugfix release for previous 1.1.54282 release. It fixes ZooKeeper nodes leak in `parts/` directory.
# ClickHouse release 1.1.54282
This is a bugfix release. The following bugs were fixed:

View File

@ -1,3 +1,7 @@
# Релиз ClickHouse 1.1.54284
* Релиз содержит изменения к предыдущему релизу 1.1.54282, которые исправляют утечку записей о кусках в ZooKeeper
# Релиз ClickHouse 1.1.54282
Релиз содержит исправления к предыдущему релизу 1.1.54276:

View File

@ -23,7 +23,7 @@ struct AggregateFunctionAvgData
/// Calculates arithmetic mean of numbers.
template <typename T>
class AggregateFunctionAvg final : public IUnaryAggregateFunction<AggregateFunctionAvgData<typename NearestFieldType<T>::Type>, AggregateFunctionAvg<T> >
class AggregateFunctionAvg final : public IUnaryAggregateFunction<AggregateFunctionAvgData<typename NearestFieldType<T>::Type>, AggregateFunctionAvg<T>>
{
public:
String getName() const override { return "avg"; }

View File

@ -0,0 +1,39 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionBitwise.h>
#include <AggregateFunctions/Helpers.h>
namespace DB
{
namespace
{
template <template <typename> class Data>
AggregateFunctionPtr createAggregateFunctionBitwise(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
AggregateFunctionPtr res(createWithUnsignedIntegerType<AggregateFunctionBitwise, Data>(*argument_types[0]));
if (!res)
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
}
void registerAggregateFunctionsBitwise(AggregateFunctionFactory & factory)
{
factory.registerFunction("groupBitOr", createAggregateFunctionBitwise<AggregateFunctionGroupBitOrData>);
factory.registerFunction("groupBitAnd", createAggregateFunctionBitwise<AggregateFunctionGroupBitAndData>);
factory.registerFunction("groupBitXor", createAggregateFunctionBitwise<AggregateFunctionGroupBitXorData>);
/// Aliases for compatibility with MySQL.
factory.registerFunction("BIT_OR", createAggregateFunctionBitwise<AggregateFunctionGroupBitOrData>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("BIT_AND", createAggregateFunctionBitwise<AggregateFunctionGroupBitAndData>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("BIT_XOR", createAggregateFunctionBitwise<AggregateFunctionGroupBitXorData>, AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,87 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnVector.h>
#include <AggregateFunctions/IUnaryAggregateFunction.h>
namespace DB
{
template <typename T>
struct AggregateFunctionGroupBitOrData
{
T value = 0;
static const char * name() { return "groupBitOr"; }
void update(T x) { value |= x; }
};
template <typename T>
struct AggregateFunctionGroupBitAndData
{
T value = -1; /// Two's complement arithmetic, sign extension.
static const char * name() { return "groupBitAnd"; }
void update(T x) { value &= x; }
};
template <typename T>
struct AggregateFunctionGroupBitXorData
{
T value = 0;
static const char * name() { return "groupBitXor"; }
void update(T x) { value ^= x; }
};
/// Counts bitwise operation on numbers.
template <typename T, typename Data>
class AggregateFunctionBitwise final : public IUnaryAggregateFunction<Data, AggregateFunctionBitwise<T, Data>>
{
public:
String getName() const override { return Data::name(); }
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeNumber<T>>();
}
void setArgument(const DataTypePtr & argument)
{
if (!argument->behavesAsNumber())
throw Exception("Illegal type " + argument->getName() + " of argument for aggregate function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).update(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).update(this->data(rhs).value);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
writeBinary(this->data(place).value, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
readBinary(this->data(place).value, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).value);
}
};
}

View File

@ -680,7 +680,7 @@ struct AggregateFunctionAnyHeavyData : Data
template <typename Data>
class AggregateFunctionsSingleValue final : public IUnaryAggregateFunction<Data, AggregateFunctionsSingleValue<Data> >
class AggregateFunctionsSingleValue final : public IUnaryAggregateFunction<Data, AggregateFunctionsSingleValue<Data>>
{
private:
DataTypePtr type;

View File

@ -34,7 +34,7 @@ struct AggregateFunctionQuantileData
*/
template <typename ArgumentFieldType, bool returns_float = true>
class AggregateFunctionQuantile final
: public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantile<ArgumentFieldType, returns_float> >
: public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantile<ArgumentFieldType, returns_float>>
{
private:
using Sample = typename AggregateFunctionQuantileData<ArgumentFieldType>::Sample;
@ -108,7 +108,7 @@ public:
*/
template <typename ArgumentFieldType, bool returns_float = true>
class AggregateFunctionQuantiles final
: public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantiles<ArgumentFieldType, returns_float> >
: public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantiles<ArgumentFieldType, returns_float>>
{
private:
using Sample = typename AggregateFunctionQuantileData<ArgumentFieldType>::Sample;

View File

@ -790,7 +790,7 @@ public:
template <typename ArgumentFieldType>
class AggregateFunctionQuantileTiming final : public IUnaryAggregateFunction<QuantileTiming, AggregateFunctionQuantileTiming<ArgumentFieldType> >
class AggregateFunctionQuantileTiming final : public IUnaryAggregateFunction<QuantileTiming, AggregateFunctionQuantileTiming<ArgumentFieldType>>
{
private:
double level;
@ -910,7 +910,7 @@ public:
* Returns an array of results.
*/
template <typename ArgumentFieldType>
class AggregateFunctionQuantilesTiming final : public IUnaryAggregateFunction<QuantileTiming, AggregateFunctionQuantilesTiming<ArgumentFieldType> >
class AggregateFunctionQuantilesTiming final : public IUnaryAggregateFunction<QuantileTiming, AggregateFunctionQuantilesTiming<ArgumentFieldType>>
{
private:
QuantileLevels<double> levels;

View File

@ -112,7 +112,7 @@ private:
template<typename T, typename Op>
class AggregateFunctionVariance final
: public IUnaryAggregateFunction<AggregateFunctionVarianceData<T, Op>,
AggregateFunctionVariance<T, Op> >
AggregateFunctionVariance<T, Op>>
{
public:
String getName() const override { return Op::name; }
@ -371,7 +371,7 @@ template<typename T, typename U, typename Op, bool compute_marginal_moments = fa
class AggregateFunctionCovariance final
: public IBinaryAggregateFunction<
CovarianceData<T, U, Op, compute_marginal_moments>,
AggregateFunctionCovariance<T, U, Op, compute_marginal_moments> >
AggregateFunctionCovariance<T, U, Op, compute_marginal_moments>>
{
public:
String getName() const override { return Op::name; }

View File

@ -21,7 +21,7 @@ struct AggregateFunctionSumData
/// Counts the sum of the numbers.
template <typename T>
class AggregateFunctionSum final : public IUnaryAggregateFunction<AggregateFunctionSumData<typename NearestFieldType<T>::Type>, AggregateFunctionSum<T> >
class AggregateFunctionSum final : public IUnaryAggregateFunction<AggregateFunctionSumData<typename NearestFieldType<T>::Type>, AggregateFunctionSum<T>>
{
public:
String getName() const override { return "sum"; }

View File

@ -122,7 +122,7 @@ struct BaseUniqCombinedData
using Key = UInt32;
using Set = CombinedCardinalityEstimator<
Key,
HashSet<Key, TrivialHash, HashTableGrower<> >,
HashSet<Key, TrivialHash, HashTableGrower<>>,
16,
14,
17,
@ -141,7 +141,7 @@ struct BaseUniqCombinedData<String, mode>
using Key = UInt64;
using Set = CombinedCardinalityEstimator<
Key,
HashSet<Key, TrivialHash, HashTableGrower<> >,
HashSet<Key, TrivialHash, HashTableGrower<>>,
16,
14,
17,
@ -252,7 +252,7 @@ struct OneAdder;
template <typename T, typename Data>
struct OneAdder<T, Data, typename std::enable_if<
std::is_same<Data, AggregateFunctionUniqUniquesHashSetData>::value ||
std::is_same<Data, AggregateFunctionUniqHLL12Data<T> >::value>::type>
std::is_same<Data, AggregateFunctionUniqHLL12Data<T>>::value>::type>
{
template <typename T2 = T>
static void addImpl(Data & data, const IColumn & column, size_t row_num,
@ -273,10 +273,10 @@ struct OneAdder<T, Data, typename std::enable_if<
template <typename T, typename Data>
struct OneAdder<T, Data, typename std::enable_if<
std::is_same<Data, AggregateFunctionUniqCombinedRawData<T> >::value ||
std::is_same<Data, AggregateFunctionUniqCombinedLinearCountingData<T> >::value ||
std::is_same<Data, AggregateFunctionUniqCombinedBiasCorrectedData<T> >::value ||
std::is_same<Data, AggregateFunctionUniqCombinedData<T> >::value>::type>
std::is_same<Data, AggregateFunctionUniqCombinedRawData<T>>::value ||
std::is_same<Data, AggregateFunctionUniqCombinedLinearCountingData<T>>::value ||
std::is_same<Data, AggregateFunctionUniqCombinedBiasCorrectedData<T>>::value ||
std::is_same<Data, AggregateFunctionUniqCombinedData<T>>::value>::type>
{
template <typename T2 = T>
static void addImpl(Data & data, const IColumn & column, size_t row_num,
@ -297,7 +297,7 @@ struct OneAdder<T, Data, typename std::enable_if<
template <typename T, typename Data>
struct OneAdder<T, Data, typename std::enable_if<
std::is_same<Data, AggregateFunctionUniqExactData<T> >::value>::type>
std::is_same<Data, AggregateFunctionUniqExactData<T>>::value>::type>
{
template <typename T2 = T>
static void addImpl(Data & data, const IColumn & column, size_t row_num,
@ -326,7 +326,7 @@ struct OneAdder<T, Data, typename std::enable_if<
/// Calculates the number of different values approximately or exactly.
template <typename T, typename Data>
class AggregateFunctionUniq final : public IUnaryAggregateFunction<Data, AggregateFunctionUniq<T, Data> >
class AggregateFunctionUniq final : public IUnaryAggregateFunction<Data, AggregateFunctionUniq<T, Data>>
{
public:
String getName() const override { return Data::getName(); }

View File

@ -116,7 +116,7 @@ struct AggregateFunctionUniqUpToData<String> : AggregateFunctionUniqUpToData<UIn
constexpr UInt8 uniq_upto_max_threshold = 100;
template <typename T>
class AggregateFunctionUniqUpTo final : public IUnaryAggregateFunction<AggregateFunctionUniqUpToData<T>, AggregateFunctionUniqUpTo<T> >
class AggregateFunctionUniqUpTo final : public IUnaryAggregateFunction<AggregateFunctionUniqUpToData<T>, AggregateFunctionUniqUpTo<T>>
{
private:
UInt8 threshold = 5; /// Default value if the parameter is not specified.

View File

@ -18,18 +18,18 @@ namespace DB
template <template <typename> class AggregateFunctionTemplate>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8>;
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16>;
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32>;
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64>;
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8>;
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16>;
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32>;
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64>;
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32>;
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64>;
else if (typeid_cast<const DataTypeEnum8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8>;
else if (typeid_cast<const DataTypeEnum16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16>;
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8>;
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16>;
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32>;
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64>;
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8>;
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16>;
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32>;
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64>;
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32>;
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64>;
else if (typeid_cast<const DataTypeEnum8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8>;
else if (typeid_cast<const DataTypeEnum16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16>;
else
return nullptr;
}
@ -37,18 +37,18 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
template <template <typename, typename> class AggregateFunctionTemplate, class Data>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>;
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>;
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data>;
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64, Data>;
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8, Data>;
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16, Data>;
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32, Data>;
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64, Data>;
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32, Data>;
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64, Data>;
else if (typeid_cast<const DataTypeEnum8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>;
else if (typeid_cast<const DataTypeEnum16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>;
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>;
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>;
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data>;
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64, Data>;
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8, Data>;
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16, Data>;
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32, Data>;
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64, Data>;
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32, Data>;
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64, Data>;
else if (typeid_cast<const DataTypeEnum8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>;
else if (typeid_cast<const DataTypeEnum16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>;
else
return nullptr;
}
@ -56,18 +56,18 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
template <template <typename, typename> class AggregateFunctionTemplate, class Data, typename ... TArgs>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type, TArgs && ... args)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeEnum8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeEnum16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>(std::forward<TArgs>(args)...);
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeEnum8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>(std::forward<TArgs>(args)...);
else if (typeid_cast<const DataTypeEnum16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>(std::forward<TArgs>(args)...);
else
return nullptr;
}
@ -75,18 +75,30 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data<UInt8> >;
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data<UInt16> >;
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data<UInt32> >;
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64, Data<UInt64> >;
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8, Data<Int8> >;
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16, Data<Int16> >;
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32, Data<Int32> >;
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64, Data<Int64> >;
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32, Data<Float32> >;
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64, Data<Float64> >;
else if (typeid_cast<const DataTypeEnum8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data<UInt8> >;
else if (typeid_cast<const DataTypeEnum16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data<UInt16> >;
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data<UInt8>>;
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data<UInt16>>;
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data<UInt32>>;
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64, Data<UInt64>>;
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8, Data<Int8>>;
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16, Data<Int16>>;
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32, Data<Int32>>;
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64, Data<Int64>>;
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32, Data<Float32>>;
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64, Data<Float64>>;
else if (typeid_cast<const DataTypeEnum8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data<UInt8>>;
else if (typeid_cast<const DataTypeEnum16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data<UInt16>>;
else
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data>
static IAggregateFunction * createWithUnsignedIntegerType(const IDataType & argument_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data<UInt8>>;
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data<UInt16>>;
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data<UInt32>>;
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64, Data<UInt64>>;
else
return nullptr;
}
@ -97,18 +109,18 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate>
static IAggregateFunction * createWithTwoNumericTypesSecond(const IDataType & second_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt8>;
else if (typeid_cast<const DataTypeUInt16 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt16>;
else if (typeid_cast<const DataTypeUInt32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt32>;
else if (typeid_cast<const DataTypeUInt64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt64>;
else if (typeid_cast<const DataTypeInt8 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int8>;
else if (typeid_cast<const DataTypeInt16 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int16>;
else if (typeid_cast<const DataTypeInt32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int32>;
else if (typeid_cast<const DataTypeInt64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int64>;
else if (typeid_cast<const DataTypeFloat32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Float32>;
else if (typeid_cast<const DataTypeFloat64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Float64>;
else if (typeid_cast<const DataTypeEnum8 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt8>;
else if (typeid_cast<const DataTypeEnum16 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt16>;
if (typeid_cast<const DataTypeUInt8 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt8>;
else if (typeid_cast<const DataTypeUInt16 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt16>;
else if (typeid_cast<const DataTypeUInt32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt32>;
else if (typeid_cast<const DataTypeUInt64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt64>;
else if (typeid_cast<const DataTypeInt8 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int8>;
else if (typeid_cast<const DataTypeInt16 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int16>;
else if (typeid_cast<const DataTypeInt32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int32>;
else if (typeid_cast<const DataTypeInt64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int64>;
else if (typeid_cast<const DataTypeFloat32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Float32>;
else if (typeid_cast<const DataTypeFloat64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Float64>;
else if (typeid_cast<const DataTypeEnum8 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt8>;
else if (typeid_cast<const DataTypeEnum16 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt16>;
else
return nullptr;
}
@ -116,18 +128,18 @@ static IAggregateFunction * createWithTwoNumericTypesSecond(const IDataType & se
template <template <typename, typename> class AggregateFunctionTemplate>
static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_type, const IDataType & second_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt8, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt16 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt16, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt32 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt64 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt64, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt8 *>(&first_type)) return createWithTwoNumericTypesSecond<Int8, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt16 *>(&first_type)) return createWithTwoNumericTypesSecond<Int16, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt32 *>(&first_type)) return createWithTwoNumericTypesSecond<Int32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt64 *>(&first_type)) return createWithTwoNumericTypesSecond<Int64, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeFloat32 *>(&first_type)) return createWithTwoNumericTypesSecond<Float32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeFloat64 *>(&first_type)) return createWithTwoNumericTypesSecond<Float64, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeEnum8 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt8, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeEnum16 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt16, AggregateFunctionTemplate>(second_type);
if (typeid_cast<const DataTypeUInt8 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt8, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt16 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt16, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt32 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt64 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt64, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt8 *>(&first_type)) return createWithTwoNumericTypesSecond<Int8, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt16 *>(&first_type)) return createWithTwoNumericTypesSecond<Int16, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt32 *>(&first_type)) return createWithTwoNumericTypesSecond<Int32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt64 *>(&first_type)) return createWithTwoNumericTypesSecond<Int64, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeFloat32 *>(&first_type)) return createWithTwoNumericTypesSecond<Float32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeFloat64 *>(&first_type)) return createWithTwoNumericTypesSecond<Float64, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeEnum8 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt8, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeEnum16 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt16, AggregateFunctionTemplate>(second_type);
else
return nullptr;
}

View File

@ -47,7 +47,7 @@ struct NanLikeValueConstructor<ResultType, false>
}
};
template<typename T, ReservoirSamplerOnEmpty::Enum OnEmpty = ReservoirSamplerOnEmpty::THROW, typename Comparer = std::less<T> >
template<typename T, ReservoirSamplerOnEmpty::Enum OnEmpty = ReservoirSamplerOnEmpty::THROW, typename Comparer = std::less<T>>
class ReservoirSampler
{
public:

View File

@ -23,6 +23,7 @@ void registerAggregateFunctionSum(AggregateFunctionFactory & factory);
void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory);
void registerAggregateFunctionUniqUpTo(AggregateFunctionFactory & factory);
void registerAggregateFunctionTopK(AggregateFunctionFactory & factory);
void registerAggregateFunctionsBitwise(AggregateFunctionFactory & factory);
void registerAggregateFunctionDebug(AggregateFunctionFactory & factory);
@ -48,6 +49,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsUniq(factory);
registerAggregateFunctionUniqUpTo(factory);
registerAggregateFunctionTopK(factory);
registerAggregateFunctionsBitwise(factory);
registerAggregateFunctionDebug(factory);
}

View File

@ -75,7 +75,7 @@ using NodeListPtr = Poco::AutoPtr<Poco::XML::NodeList>;
static ElementIdentifier getElementIdentifier(Node * element)
{
NamedNodeMapPtr attrs = element->attributes();
std::vector<std::pair<std::string, std::string> > attrs_kv;
std::vector<std::pair<std::string, std::string>> attrs_kv;
for (size_t i = 0; i < attrs->length(); ++i)
{
Node * node = attrs->item(i);

View File

@ -29,7 +29,7 @@ using XMLDocumentPtr = Poco::AutoPtr<Poco::XML::Document>;
class ConfigProcessor
{
public:
using Substitutions = std::vector<std::pair<std::string, std::string> >;
using Substitutions = std::vector<std::pair<std::string, std::string>>;
/// Set log_to_console to true if the logging subsystem is not initialized yet.
ConfigProcessor(bool throw_on_bad_incl = false, bool log_to_console = false, const Substitutions & substitutions = Substitutions());

View File

@ -35,7 +35,7 @@ public:
std::string format; /// Name of the data storage format
/// Description of the table structure: (column name, data type name)
std::vector<std::pair<std::string, std::string> > structure;
std::vector<std::pair<std::string, std::string>> structure;
std::unique_ptr<ReadBuffer> read_buffer;
Block sample_block;

View File

@ -19,7 +19,7 @@ public:
using mapped_type = typename Cell::Mapped;
using value_type = typename Cell::value_type;
using TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, HashMapTable<Key, Cell, Hash, Grower, Allocator> >::TwoLevelHashTable;
using TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, HashMapTable<Key, Cell, Hash, Grower, Allocator>>::TwoLevelHashTable;
mapped_type & ALWAYS_INLINE operator[](Key x)
{

View File

@ -28,7 +28,7 @@ struct TrivialWeightFunction
/// Cache starts to evict entries when their total weight exceeds max_size and when expiration time of these
/// entries is due.
/// Value weight should not change after insertion.
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TMapped>, typename WeightFunction = TrivialWeightFunction<TMapped> >
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TMapped>, typename WeightFunction = TrivialWeightFunction<TMapped>>
class LRUCache
{
public:

View File

@ -27,10 +27,11 @@ namespace ErrorCodes
class Throttler
{
public:
Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_)
: max_speed(max_speed_), limit(limit_), limit_exceeded_exception_message(limit_exceeded_exception_message_) {}
Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
const std::shared_ptr<Throttler> & parent = nullptr)
: max_speed(max_speed_), limit(limit_), limit_exceeded_exception_message(limit_exceeded_exception_message_), parent(parent) {}
void add(size_t amount)
void add(const size_t amount)
{
size_t new_count;
UInt64 elapsed_ns = 0;
@ -70,6 +71,9 @@ public:
nanosleep(&sleep_ts, nullptr); /// NOTE Returns early in case of a signal. This is considered normal.
}
}
if (parent)
parent->add(amount);
}
private:
@ -79,6 +83,9 @@ private:
const char * limit_exceeded_exception_message = nullptr;
Stopwatch watch {CLOCK_MONOTONIC_COARSE};
std::mutex mutex;
/// Used to implement a hierarchy of throttlers
std::shared_ptr<Throttler> parent;
};

View File

@ -71,11 +71,12 @@ void ZooKeeper::processCallback(zhandle_t * zh, int type, int state, const char
destroyContext(context);
}
void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_)
void ZooKeeper::init(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_)
{
log = &Logger::get("ZooKeeper");
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
hosts = hosts_;
identity = identity_;
session_timeout_ms = session_timeout_ms_;
impl = zookeeper_init(hosts.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0);
@ -84,12 +85,23 @@ void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_)
if (!impl)
throw KeeperException("Fail to initialize zookeeper. Hosts are " + hosts);
default_acl = &ZOO_OPEN_ACL_UNSAFE;
if (!identity.empty())
{
auto code = zoo_add_auth(impl, "digest", identity.c_str(), static_cast<int>(identity.size()), 0, 0);
if (code != ZOK)
throw KeeperException("Zookeeper authentication failed. Hosts are " + hosts, code);
default_acl = &ZOO_CREATOR_ALL_ACL;
}
else
default_acl = &ZOO_OPEN_ACL_UNSAFE;
LOG_TRACE(log, "initialized, hosts: " << hosts);
}
ZooKeeper::ZooKeeper(const std::string & hosts, int32_t session_timeout_ms)
ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity, int32_t session_timeout_ms)
{
init(hosts, session_timeout_ms);
init(hosts, identity, session_timeout_ms);
}
struct ZooKeeperArgs
@ -100,6 +112,7 @@ struct ZooKeeperArgs
config.keys(config_name, keys);
std::vector<std::string> hosts_strings;
std::string root;
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
for (const auto & key : keys)
@ -107,12 +120,22 @@ struct ZooKeeperArgs
if (startsWith(key, "node"))
{
hosts_strings.push_back(
config.getString(config_name + "." + key + ".host") + ":" + config.getString(config_name + "." + key + ".port", "2181"));
config.getString(config_name + "." + key + ".host") + ":"
+ config.getString(config_name + "." + key + ".port", "2181")
);
}
else if (key == "session_timeout_ms")
{
session_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "identity")
{
identity = config.getString(config_name + "." + key);
}
else if (key == "root")
{
root = config.getString(config_name + "." + key);
}
else throw KeeperException(std::string("Unknown key ") + key + " in config file");
}
@ -127,16 +150,24 @@ struct ZooKeeperArgs
hosts += ",";
hosts += host;
}
if (!root.empty())
{
if (root.front() != '/')
throw KeeperException(std::string("Root path in config file should start with '/', but got ") + root);
hosts += root;
}
}
std::string hosts;
size_t session_timeout_ms;
std::string identity;
int session_timeout_ms;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
ZooKeeperArgs args(config, config_name);
init(args.hosts, args.session_timeout_ms);
init(args.hosts, args.identity, args.session_timeout_ms);
}
WatchCallback ZooKeeper::callbackForEvent(const EventPtr & event)
@ -710,7 +741,7 @@ ZooKeeper::~ZooKeeper()
ZooKeeperPtr ZooKeeper::startNewSession() const
{
return std::make_shared<ZooKeeper>(hosts, session_timeout_ms);
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms);
}
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_)

View File

@ -54,7 +54,7 @@ class ZooKeeper
public:
using Ptr = std::shared_ptr<ZooKeeper>;
ZooKeeper(const std::string & hosts, int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT);
ZooKeeper(const std::string & hosts, const std::string & identity = "", int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT);
/** Config of the form:
<zookeeper>
@ -67,6 +67,10 @@ public:
<port>2181</port>
</node>
<session_timeout_ms>30000</session_timeout_ms>
<!-- Optional. Chroot suffix. Should exist. -->
<root>/path/to/zookeeper/node</root>
<!-- Optional. Zookeeper digest ACL string. -->
<identity>user:password</identity>
</zookeeper>
*/
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
@ -353,7 +357,7 @@ private:
friend struct WatchContext;
friend class EphemeralNodeHolder;
void init(const std::string & hosts, int32_t session_timeout_ms);
void init(const std::string & hosts, const std::string & identity, int32_t session_timeout_ms);
void removeChildrenRecursive(const std::string & path);
void tryRemoveChildrenRecursive(const std::string & path);
@ -397,6 +401,7 @@ private:
MultiFuture asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception);
std::string hosts;
std::string identity;
int32_t session_timeout_ms;
std::mutex mutex;

View File

@ -11,7 +11,7 @@ int main()
{
try
{
ZooKeeper zk("mtfilter01t:2181,metrika-test:2181,mtweb01t:2181", 5000);
ZooKeeper zk("mtfilter01t:2181,metrika-test:2181,mtweb01t:2181", "", 5000);
Strings children;
std::cout << "create path" << std::endl;

View File

@ -11,7 +11,7 @@
int main(int argc, char ** argv)
{
{
using Cont = HashSet<int, DefaultHash<int>, HashTableGrower<1> >;
using Cont = HashSet<int, DefaultHash<int>, HashTableGrower<1>>;
Cont cont;
cont.insert(1);
@ -36,7 +36,7 @@ int main(int argc, char ** argv)
}
{
using Cont = HashMap<int, std::string, DefaultHash<int>, HashTableGrower<1> >;
using Cont = HashMap<int, std::string, DefaultHash<int>, HashTableGrower<1>>;
Cont cont;
cont.insert(Cont::value_type(1, "Hello, world!"));

View File

@ -31,7 +31,7 @@ while (0)
void run()
{
const std::vector<std::function<bool()> > tests =
const std::vector<std::function<bool()>> tests =
{
test1,
test2,

View File

@ -57,7 +57,7 @@ using Mutex = std::mutex;
Key,
HashTableCellWithLock<
Key,
HashMapCell<Key, Value, DefaultHash<Key> > >,
HashMapCell<Key, Value, DefaultHash<Key>> >,
DefaultHash<Key>,
HashTableGrower<21>,
HashTableAllocator>;*/

View File

@ -37,8 +37,8 @@ STRONG_TYPEDEF(TupleBackend, Tuple); /// Array and Tuple are different types wit
* is not generalized,
* but somewhat more efficient, and simpler.
*
* Used to represent a unit value of one of several types in the RAM.
* Warning! Preferably, instead of single values, store the pieces of the columns. See Column.h
* Used to represent a single value of one of several types in memory.
* Warning! Prefer to use chunks of columns instead of single values. See Column.h
*/
class Field
{

View File

@ -42,7 +42,7 @@ template <typename T> struct IsNullable { static constexpr bool value = false; }
template <typename T> struct IsNullable<Nullable<T>> { static constexpr bool value = true; };
template <typename T> struct IsNumber { static constexpr bool value = false; };
template <typename T> struct IsNumber<Nullable<T> > { static constexpr bool value = IsNumber<T>::value; };
template <typename T> struct IsNumber<Nullable<T>> { static constexpr bool value = IsNumber<T>::value; };
template <> struct IsNumber<UInt8> { static constexpr bool value = true; };
template <> struct IsNumber<UInt16> { static constexpr bool value = true; };

View File

@ -213,12 +213,12 @@ struct DataTypeProduct
};
template <typename T1, typename T2>
struct DataTypeProduct<T1, Nullable<T2> >
struct DataTypeProduct<T1, Nullable<T2>>
{
using Type = typename ToEnrichedDataType<
typename NumberTraits::TypeProduct<
typename ToEnrichedNumericType<T1>::Type,
typename NumberTraits::EmbedType<Nullable<typename T2::FieldType> >::Type
typename NumberTraits::EmbedType<Nullable<typename T2::FieldType>>::Type
>::Type
>::Type;
};

View File

@ -503,16 +503,16 @@ template <> struct EmbedType<Float32> { using Type = Enriched::Float32<HasNoNull
template <> struct EmbedType<Float64> { using Type = Enriched::Float64<HasNoNull>; };
template <> struct EmbedType<Null> { using Type = Enriched::Void<HasNull>; };
template <> struct EmbedType<Nullable<Int8> > { using Type = Enriched::Int8<HasNull>; };
template <> struct EmbedType<Nullable<Int16> > { using Type = Enriched::Int16<HasNull>; };
template <> struct EmbedType<Nullable<Int32> > { using Type = Enriched::Int32<HasNull>; };
template <> struct EmbedType<Nullable<Int64> > { using Type = Enriched::Int64<HasNull>; };
template <> struct EmbedType<Nullable<UInt8> > { using Type = Enriched::UInt8<HasNull>; };
template <> struct EmbedType<Nullable<UInt16> > { using Type = Enriched::UInt16<HasNull>; };
template <> struct EmbedType<Nullable<UInt32> > { using Type = Enriched::UInt32<HasNull>; };
template <> struct EmbedType<Nullable<UInt64> > { using Type = Enriched::UInt64<HasNull>; };
template <> struct EmbedType<Nullable<Float32> > { using Type = Enriched::Float32<HasNull>; };
template <> struct EmbedType<Nullable<Float64> > { using Type = Enriched::Float64<HasNull>; };
template <> struct EmbedType<Nullable<Int8>> { using Type = Enriched::Int8<HasNull>; };
template <> struct EmbedType<Nullable<Int16>> { using Type = Enriched::Int16<HasNull>; };
template <> struct EmbedType<Nullable<Int32>> { using Type = Enriched::Int32<HasNull>; };
template <> struct EmbedType<Nullable<Int64>> { using Type = Enriched::Int64<HasNull>; };
template <> struct EmbedType<Nullable<UInt8>> { using Type = Enriched::UInt8<HasNull>; };
template <> struct EmbedType<Nullable<UInt16>> { using Type = Enriched::UInt16<HasNull>; };
template <> struct EmbedType<Nullable<UInt32>> { using Type = Enriched::UInt32<HasNull>; };
template <> struct EmbedType<Nullable<UInt64>> { using Type = Enriched::UInt64<HasNull>; };
template <> struct EmbedType<Nullable<Float32>> { using Type = Enriched::Float32<HasNull>; };
template <> struct EmbedType<Nullable<Float64>> { using Type = Enriched::Float64<HasNull>; };
/// Get an ordinary type from an enriched type.
template <typename TType>

View File

@ -1,20 +1,19 @@
#include <Dictionaries/Embedded/RegionsHierarchies.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <Poco/DirectoryIterator.h>
static constexpr auto config_key = "path_to_regions_hierarchy_file";
RegionsHierarchies::RegionsHierarchies()
: RegionsHierarchies(Poco::Util::Application::instance().config().getString(config_key))
void RegionsHierarchies::reload(const Poco::Util::AbstractConfiguration & config)
{
reload(config.getString(config_key));
}
RegionsHierarchies::RegionsHierarchies(const std::string & path)
void RegionsHierarchies::reload(const std::string & path)
{
Logger * log = &Logger::get("RegionsHierarchies");
@ -47,10 +46,12 @@ RegionsHierarchies::RegionsHierarchies(const std::string & path)
std::forward_as_tuple(dir_it->path()));
}
}
reload();
}
bool RegionsHierarchies::isConfigured()
bool RegionsHierarchies::isConfigured(const Poco::Util::AbstractConfiguration & config)
{
return Poco::Util::Application::instance().config().has(config_key);
return config.has(config_key);
}

View File

@ -1,7 +1,10 @@
#pragma once
#include <Dictionaries/Embedded/RegionsHierarchy.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Exception.h>
#include <unordered_map>
@ -24,11 +27,11 @@ public:
* For example, if /opt/geo/regions_hierarchy.txt is specified,
* then the /opt/geo/regions_hierarchy_ua.txt file will also be loaded, if any, it will be accessible by the `ua` key.
*/
RegionsHierarchies();
explicit RegionsHierarchies(const std::string & path_to_regions_hierarchy_file);
void reload(const Poco::Util::AbstractConfiguration & config);
void reload(const std::string & directory);
/// Has corresponding section in configuration file.
static bool isConfigured();
static bool isConfigured(const Poco::Util::AbstractConfiguration & config);
/** Reloads, if necessary, all hierarchies of regions.

View File

@ -12,13 +12,6 @@
#include <IO/WriteHelpers.h>
static constexpr auto config_key = "path_to_regions_hierarchy_file";
RegionsHierarchy::RegionsHierarchy()
{
path = Poco::Util::Application::instance().config().getString(config_key);
}
RegionsHierarchy::RegionsHierarchy(const std::string & path_)
{

View File

@ -3,7 +3,6 @@
#include <vector>
#include <boost/noncopyable.hpp>
#include <common/Types.h>
#include <ext/singleton.h>
#define REGION_TYPE_CITY 6
@ -61,7 +60,6 @@ private:
std::string path;
public:
RegionsHierarchy();
RegionsHierarchy(const std::string & path_);
/// Reloads, if necessary, the hierarchy of regions. Not threadsafe.
@ -142,13 +140,3 @@ public:
return populations[region];
}
};
class RegionsHierarchySingleton : public ext::singleton<RegionsHierarchySingleton>, public RegionsHierarchy
{
friend class ext::singleton<RegionsHierarchySingleton>;
protected:
RegionsHierarchySingleton()
{
}
};

View File

@ -28,10 +28,9 @@ std::string RegionsNames::dumpSupportedLanguagesNames()
return res;
}
void RegionsNames::reload()
void RegionsNames::reload(const Poco::Util::AbstractConfiguration & config)
{
std::string directory = Poco::Util::Application::instance().config().getString(config_key);
reload(directory);
reload(config.getString(config_key));
}
void RegionsNames::reload(const std::string & directory)
@ -110,7 +109,7 @@ void RegionsNames::reload(const std::string & directory)
}
bool RegionsNames::isConfigured()
bool RegionsNames::isConfigured(const Poco::Util::AbstractConfiguration & config)
{
return Poco::Util::Application::instance().config().has(config_key);
return config.has(config_key);
}

View File

@ -1,10 +1,13 @@
#pragma once
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Exception.h>
#include <common/Types.h>
#include <common/StringRef.h>
#include <string>
#include <vector>
#include <Poco/Exception.h>
#include <common/Types.h>
#include <common/StringRef.h>
/** A class that allows you to recognize by region id its text name in one of the supported languages: ru, en, ua, by, kz, tr.
@ -67,11 +70,11 @@ private:
public:
/** Reboot, if necessary, the names of regions.
*/
void reload();
void reload(const Poco::Util::AbstractConfiguration & config);
void reload(const std::string & directory);
/// Has corresponding section in configuration file.
static bool isConfigured();
static bool isConfigured(const Poco::Util::AbstractConfiguration & config);
StringRef getRegionName(RegionID region_id, Language language = Language::RU) const

View File

@ -10,10 +10,9 @@
static constexpr auto config_key = "mysql_metrica";
void TechDataHierarchy::reload()
void TechDataHierarchy::reload(const Poco::Util::AbstractConfiguration & config)
{
Logger * log = &Logger::get("TechDataHierarchy");
LOG_DEBUG(log, "Loading tech data hierarchy.");
mysqlxx::PoolWithFailover pool(config_key);
@ -53,9 +52,9 @@ void TechDataHierarchy::reload()
}
bool TechDataHierarchy::isConfigured()
bool TechDataHierarchy::isConfigured(const Poco::Util::AbstractConfiguration & config)
{
return Poco::Util::Application::instance().config().has(config_key);
return config.has(config_key);
}
#endif

View File

@ -1,8 +1,12 @@
#pragma once
#include <ext/singleton.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Exception.h>
#include <common/Types.h>
#include <ext/singleton.h>
/** @brief Class that lets you know if a search engine or operating system belongs
* another search engine or operating system, respectively.
@ -15,10 +19,10 @@ private:
UInt8 se_parent[256] {};
public:
void reload();
void reload(const Poco::Util::AbstractConfiguration & config);
/// Has corresponding section in configuration file.
static bool isConfigured();
static bool isConfigured(const Poco::Util::AbstractConfiguration & config);
/// The "belongs" relation.

View File

@ -14,7 +14,7 @@ namespace DB
/** Initialized by vector. Writes data to it. When the vector is finished, it doubles its size.
* CharType - char or unsigned char.
*/
template <typename VectorType = std::vector<char> >
template <typename VectorType = std::vector<char>>
class WriteBufferFromVector : public WriteBuffer
{
private:

View File

@ -67,7 +67,7 @@ void run()
std::string buf5;
prepare4(filename5, buf5);
const std::vector<std::function<bool()> > tests =
const std::vector<std::function<bool()>> tests =
{
std::bind(test1, std::ref(filename)),
std::bind(test2, std::ref(filename), std::ref(buf)),

View File

@ -32,7 +32,7 @@ bool test10();
void run()
{
const std::vector<std::function<bool()> > tests =
const std::vector<std::function<bool()>> tests =
{
test1,
test2,

View File

@ -6,6 +6,7 @@
#include <Interpreters/IInterpreter.h>
#include <Parsers/queryToString.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <Interpreters/ProcessList.h>
namespace DB
@ -39,13 +40,24 @@ BlockInputStreams executeQuery(
Context new_context(context);
new_context.setSettings(new_settings);
ThrottlerPtr user_level_throttler;
if (settings.limits.max_network_bandwidth_for_user)
if (auto process_list_element = context.getProcessListElement())
if (auto user_process_list = process_list_element->user_process_list)
user_level_throttler = user_process_list->user_throttler;
/// Network bandwidth limit, if needed.
ThrottlerPtr throttler;
if (settings.limits.max_network_bandwidth || settings.limits.max_network_bytes)
{
throttler = std::make_shared<Throttler>(
settings.limits.max_network_bandwidth,
settings.limits.max_network_bytes,
"Limit for bytes to send or receive over network exceeded.");
settings.limits.max_network_bandwidth,
settings.limits.max_network_bytes,
"Limit for bytes to send or receive over network exceeded.",
user_level_throttler);
}
else if (settings.limits.max_network_bandwidth_for_user)
throttler = user_level_throttler;
for (const auto & shard_info : cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, res);

View File

@ -105,6 +105,8 @@ struct ContextShared
String path; /// Path to the data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
String flags_path; /// Path to the directory with some control flags for server maintenance.
ConfigurationPtr config; /// Global configuration settings.
Databases databases; /// List of databases and tables in them.
FormatFactory format_factory; /// Formats.
mutable std::shared_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaeis. Have lazy initialization.
@ -484,6 +486,23 @@ void Context::setFlagsPath(const String & path)
shared->flags_path = path;
}
void Context::setConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
shared->config = config;
}
ConfigurationPtr Context::getConfig() const
{
auto lock = getLock();
return shared->config;
}
Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
auto lock = getLock();
return shared->config ? *shared->config : Poco::Util::Application::instance().config();
}
void Context::setUsersConfig(const ConfigurationPtr & config)
{
@ -499,7 +518,6 @@ ConfigurationPtr Context::getUsersConfig()
return shared->users_config;
}
void Context::calculateUserSettings()
{
auto lock = getLock();
@ -1031,7 +1049,7 @@ EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_
std::lock_guard<std::mutex> lock(shared->embedded_dictionaries_mutex);
if (!shared->embedded_dictionaries)
shared->embedded_dictionaries = std::make_shared<EmbeddedDictionaries>(throw_on_error);
shared->embedded_dictionaries = std::make_shared<EmbeddedDictionaries>(*this->global_context, throw_on_error);
return *shared->embedded_dictionaries;
}
@ -1087,6 +1105,11 @@ ProcessList::Element * Context::getProcessListElement()
return process_list_elem;
}
const ProcessList::Element * Context::getProcessListElement() const
{
return process_list_elem;
}
void Context::setUncompressedCache(size_t max_size_in_bytes)
{
@ -1237,7 +1260,10 @@ std::pair<String, UInt16> Context::getInterserverIOAddress() const
UInt16 Context::getTCPPort() const
{
return Poco::Util::Application::instance().config().getInt("tcp_port");
auto lock = getLock();
auto & config = getConfigRef();
return config.getInt("tcp_port");
}
@ -1264,7 +1290,7 @@ Clusters & Context::getClusters() const
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
if (!shared->clusters)
{
auto & config = shared->clusters_config ? *shared->clusters_config : Poco::Util::Application::instance().config();
auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef();
shared->clusters = std::make_unique<Clusters>(config, settings);
}
}
@ -1310,7 +1336,7 @@ QueryLog & Context::getQueryLog()
if (!global_context)
throw Exception("Logical error: no global context for query log", ErrorCodes::LOGICAL_ERROR);
auto & config = Poco::Util::Application::instance().config();
auto & config = getConfigRef();
String database = config.getString("query_log.database", "system");
String table = config.getString("query_log.table", "query_log");
@ -1329,7 +1355,7 @@ PartLog * Context::getPartLog(const String & database, const String & table)
{
auto lock = getLock();
auto & config = Poco::Util::Application::instance().config();
auto & config = getConfigRef();
if (!config.has("part_log"))
return nullptr;
@ -1370,7 +1396,7 @@ CompressionMethod Context::chooseCompressionMethod(size_t part_size, double part
if (!shared->compression_method_selector)
{
constexpr auto config_name = "compression";
auto & config = Poco::Util::Application::instance().config();
auto & config = getConfigRef();
if (config.has(config_name))
shared->compression_method_selector = std::make_unique<CompressionMethodSelector>(config, "compression");
@ -1388,7 +1414,7 @@ const MergeTreeSettings & Context::getMergeTreeSettings()
if (!shared->merge_tree_settings)
{
auto & config = Poco::Util::Application::instance().config();
auto & config = getConfigRef();
shared->merge_tree_settings = std::make_unique<MergeTreeSettings>();
shared->merge_tree_settings->loadFromConfig("merge_tree", config);
}

View File

@ -128,12 +128,16 @@ public:
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/// Global application configuration settings.
void setConfig(const ConfigurationPtr & config);
ConfigurationPtr getConfig() const;
Poco::Util::AbstractConfiguration & getConfigRef() const;
/** Take the list of users, quotas and configuration profiles from this config.
* The list of users is completely replaced.
* The accumulated quota values are not reset if the quota is not deleted.
*/
void setUsersConfig(const ConfigurationPtr & config);
ConfigurationPtr getUsersConfig();
/// Must be called before getClientInfo.
@ -267,6 +271,7 @@ public:
void setProcessListElement(ProcessListElement * elem);
/// Can return nullptr if the query was not inserted into the ProcessList.
ProcessListElement * getProcessListElement();
const ProcessListElement * getProcessListElement() const;
/// List all queries.
ProcessList & getProcessList();

View File

@ -507,7 +507,8 @@ bool DDLWorker::tryExecuteQuery(const String & query, const DDLTask & task, Exec
try
{
executeQuery(istr, ostr, false, context, nullptr);
Context local_context(context);
executeQuery(istr, ostr, false, local_context, nullptr);
}
catch (...)
{

View File

@ -1,12 +1,15 @@
#include <common/logger_useful.h>
#include <Poco/Util/Application.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Dictionaries/Embedded/RegionsHierarchies.h>
#include <Dictionaries/Embedded/TechDataHierarchy.h>
#include <Dictionaries/Embedded/RegionsNames.h>
#include <Dictionaries/Embedded/TechDataHierarchy.h>
#include <Interpreters/Context.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/config.h>
#include <common/logger_useful.h>
#include <Poco/Util/Application.h>
namespace DB
@ -31,7 +34,9 @@ void EmbeddedDictionaries::handleException(const bool throw_on_error) const
template <typename Dictionary>
bool EmbeddedDictionaries::reloadDictionary(MultiVersion<Dictionary> & dictionary, const bool throw_on_error, const bool force_reload)
{
bool defined_in_config = Dictionary::isConfigured();
const auto & config = context.getConfigRef();
bool defined_in_config = Dictionary::isConfigured(config);
bool not_initialized = dictionary.get() == nullptr;
if (defined_in_config && (force_reload || !is_fast_start_stage || not_initialized))
@ -39,7 +44,7 @@ bool EmbeddedDictionaries::reloadDictionary(MultiVersion<Dictionary> & dictionar
try
{
auto new_dictionary = std::make_unique<Dictionary>();
new_dictionary->reload();
new_dictionary->reload(config);
dictionary.set(new_dictionary.release());
}
catch (...)
@ -110,20 +115,16 @@ void EmbeddedDictionaries::reloadPeriodically()
}
EmbeddedDictionaries::EmbeddedDictionaries(const bool throw_on_error, const int reload_period_)
: reload_period(reload_period_), log(&Logger::get("EmbeddedDictionaries"))
EmbeddedDictionaries::EmbeddedDictionaries(Context & context_, const bool throw_on_error)
: log(&Logger::get("EmbeddedDictionaries"))
, context(context_)
, reload_period(context_.getConfigRef().getInt("builtin_dictionaries_reload_interval", 3600))
{
reloadImpl(throw_on_error);
reloading_thread = std::thread([this] { reloadPeriodically(); });
}
EmbeddedDictionaries::EmbeddedDictionaries(const bool throw_on_error)
: EmbeddedDictionaries(throw_on_error,
Poco::Util::Application::instance().config().getInt("builtin_dictionaries_reload_interval", 3600))
{}
EmbeddedDictionaries::~EmbeddedDictionaries()
{
destroy.set();

View File

@ -23,6 +23,9 @@ class Context;
class EmbeddedDictionaries
{
private:
Poco::Logger * log;
Context & context;
MultiVersion<RegionsHierarchies> regions_hierarchies;
MultiVersion<TechDataHierarchy> tech_data_hierarchy;
MultiVersion<RegionsNames> regions_names;
@ -37,8 +40,6 @@ private:
std::thread reloading_thread;
Poco::Event destroy;
Poco::Logger * log;
void handleException(const bool throw_on_error) const;
@ -56,9 +57,7 @@ private:
public:
/// Every reload_period seconds directories are updated inside a separate thread.
EmbeddedDictionaries(const bool throw_on_error, const int reload_period_);
EmbeddedDictionaries(const bool throw_on_error);
EmbeddedDictionaries(Context & context, const bool throw_on_error);
/// Forcibly reloads all dictionaries.
void reload();

View File

@ -1,4 +1,5 @@
#include <Interpreters/ExternalDictionaries.h>
#include <Interpreters/Context.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/IDictionarySource.h>
@ -223,7 +224,7 @@ void ExternalDictionaries::reloadAndUpdate(bool throw_on_error)
void ExternalDictionaries::reloadFromConfigFiles(const bool throw_on_error, const bool force_reload, const std::string & only_dictionary)
{
const auto config_paths = getDictionariesConfigPaths(Poco::Util::Application::instance().config());
const auto config_paths = getDictionariesConfigPaths(context.getConfigRef());
for (const auto & config_path : config_paths)
{

View File

@ -98,6 +98,8 @@ struct Limits
M(SettingUInt64, max_network_bandwidth, 0) \
/** The maximum number of bytes to receive or transmit over the network, as part of the query. */ \
M(SettingUInt64, max_network_bytes, 0) \
/** The maximum speed of data exchange over the network for the user in bytes per second. 0 - not bounded. */ \
M(SettingUInt64, max_network_bandwidth_for_user, 0)
#define DECLARE(TYPE, NAME, DEFAULT) \
TYPE NAME {DEFAULT};

View File

@ -101,6 +101,14 @@ ProcessList::EntryPtr ProcessList::insert(
total_memory_tracker.setDescription("(total)");
user_process_list.user_memory_tracker.setNext(&total_memory_tracker);
}
if (settings.limits.max_network_bandwidth_for_user && !user_process_list.user_throttler)
{
user_process_list.user_throttler = std::make_shared<Throttler>(settings.limits.max_network_bandwidth_for_user, 0,
"Network bandwidth limit for a user exceeded.");
}
res->get().user_process_list = &user_process_list;
}
}
@ -124,21 +132,22 @@ ProcessListEntry::~ProcessListEntry()
/// This removes the memory_tracker of one request.
parent.cont.erase(it);
ProcessList::UserToQueries::iterator user_process_list = parent.user_to_queries.find(user);
auto user_process_list = parent.user_to_queries.find(user);
if (user_process_list != parent.user_to_queries.end())
{
/// In case the request is canceled, the data about it is deleted from the map at the time of cancellation, and not here.
if (!is_cancelled && !query_id.empty())
{
ProcessListForUser::QueryToElement::iterator element = user_process_list->second.queries.find(query_id);
auto element = user_process_list->second.queries.find(query_id);
if (element != user_process_list->second.queries.end())
user_process_list->second.queries.erase(element);
}
/// This removes the memory_tracker from the user. At this time, the memory_tracker that references it does not live.
/// If there are no more queries for the user, then we delete the record.
/// If there are no more queries for the user, then we delete the entry.
/// This also clears the MemoryTracker for the user, and a message about the memory consumption is output to the log.
/// This also clears network bandwidth Throttler, so it will not count periods of inactivity.
/// Sometimes it is important to reset the MemoryTracker, because it may accumulate skew
/// due to the fact that there are cases when memory can be allocated while processing the request, but released later.
if (user_process_list->second.queries.empty())

View File

@ -14,6 +14,7 @@
#include <Interpreters/ClientInfo.h>
#include <Common/CurrentMetrics.h>
#include <DataStreams/BlockIO.h>
#include <Common/Throttler.h>
namespace CurrentMetrics
@ -29,6 +30,7 @@ using StoragePtr = std::shared_ptr<IStorage>;
using Tables = std::map<String, StoragePtr>;
struct Settings;
class IAST;
struct ProcessListForUser;
/** List of currently executing queries.
@ -76,6 +78,9 @@ struct ProcessListElement
/// Temporary tables could be registered here. Modify under mutex.
Tables temporary_tables;
/// Be careful using it. For example, queries field could be modified concurrently.
const ProcessListForUser * user_process_list = nullptr;
protected:
mutable std::mutex query_streams_mutex;
@ -169,6 +174,9 @@ struct ProcessListForUser
/// Limit and counter for memory of all simultaneously running queries of single user.
MemoryTracker user_memory_tracker;
/// Count network usage for all simultaneously running queries of single user.
ThrottlerPtr user_throttler;
};
@ -214,10 +222,15 @@ private:
mutable std::mutex mutex;
mutable Poco::Condition have_space; /// Number of currently running queries has become less than maximum.
/// List of queries
Container cont;
size_t cur_size; /// In C++03 or C++11 and old ABI, std::list::size is not O(1).
size_t max_size; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
/// Stores per-user info: queries, statistics and limits
UserToQueries user_to_queries;
/// Stores info about queries grouped by their priority
QueryPriorities priorities;
/// Limit and counter for memory of all simultaneously running queries.

View File

@ -244,8 +244,8 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
std::unordered_map<Key, Value, DefaultHash<Key> > map;
std::unordered_map<Key, Value, DefaultHash<Key> >::iterator it;
std::unordered_map<Key, Value, DefaultHash<Key>> map;
std::unordered_map<Key, Value, DefaultHash<Key>>::iterator it;
for (size_t i = 0; i < n; ++i)
{
it = map.insert(std::make_pair(data[i], std::move(value))).first;
@ -264,8 +264,8 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
google::dense_hash_map<Key, Value, DefaultHash<Key> > map;
google::dense_hash_map<Key, Value, DefaultHash<Key> >::iterator it;
google::dense_hash_map<Key, Value, DefaultHash<Key>> map;
google::dense_hash_map<Key, Value, DefaultHash<Key>>::iterator it;
map.set_empty_key(-1ULL);
for (size_t i = 0; i < n; ++i)
{
@ -285,8 +285,8 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
google::sparse_hash_map<Key, Value, DefaultHash<Key> > map;
google::sparse_hash_map<Key, Value, DefaultHash<Key> >::iterator it;
google::sparse_hash_map<Key, Value, DefaultHash<Key>> map;
google::sparse_hash_map<Key, Value, DefaultHash<Key>>::iterator it;
for (size_t i = 0; i < n; ++i)
{
map.insert(std::make_pair(data[i], std::move(value)));

View File

@ -434,7 +434,7 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
std::unordered_map<Key, Value, DefaultHash<Key> > map;
std::unordered_map<Key, Value, DefaultHash<Key>> map;
for (size_t i = 0; i < n; ++i)
++map[data[i]];
@ -450,7 +450,7 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
google::dense_hash_map<Key, Value, DefaultHash<Key> > map;
google::dense_hash_map<Key, Value, DefaultHash<Key>> map;
map.set_empty_key(Key("\0", 1));
for (size_t i = 0; i < n; ++i)
++map[data[i]];
@ -467,7 +467,7 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
google::sparse_hash_map<Key, Value, DefaultHash<Key> > map;
google::sparse_hash_map<Key, Value, DefaultHash<Key>> map;
for (size_t i = 0; i < n; ++i)
++map[data[i]];

View File

@ -49,9 +49,9 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
std::cerr << sizeof(HashMapCell<Key, Value, DefaultHash<Key> >) << std::endl;
std::cerr << sizeof(HashMapCell<Key, Value, DefaultHash<Key>>) << std::endl;
using Map = TwoLevelHashTable<Key, HashMapCell<Key, Value, DefaultHash<Key> >, DefaultHash<Key>, HashTableGrower<8>, HashTableAllocator>;
using Map = TwoLevelHashTable<Key, HashMapCell<Key, Value, DefaultHash<Key>>, DefaultHash<Key>, HashTableGrower<8>, HashTableAllocator>;
Map map;
Map::iterator it;
@ -86,7 +86,7 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
using Map = TwoLevelHashTable<Key, HashMapCell<Key, Value, DefaultHash<Key> >, DefaultHash<Key>, HashTableGrower<8>, HashTableAllocator>;
using Map = TwoLevelHashTable<Key, HashMapCell<Key, Value, DefaultHash<Key>>, DefaultHash<Key>, HashTableGrower<8>, HashTableAllocator>;
//using Map = HashMap<Key, Value, UniquesHashSetDefaultHash>;
Map map;

View File

@ -130,9 +130,10 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
}
static std::chrono::steady_clock::duration parseSessionTimeout(const HTMLForm & params)
static std::chrono::steady_clock::duration parseSessionTimeout(
const Poco::Util::AbstractConfiguration & config,
const HTMLForm & params)
{
const auto & config = Poco::Util::Application::instance().config();
unsigned session_timeout = config.getInt("default_session_timeout", 60);
if (params.has("session_timeout"))
@ -245,7 +246,7 @@ void HTTPHandler::processQuery(
if (session_is_set)
{
session_id = params.get("session_id");
session_timeout = parseSessionTimeout(params);
session_timeout = parseSessionTimeout(server.config(), params);
std::string session_check = params.get("session_check", "");
session = context.acquireSession(session_id, session_timeout, session_check == "1");

View File

@ -1,15 +1,21 @@
#include "MetricsTransmitter.h"
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <daemon/BaseDaemon.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/Context.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <daemon/BaseDaemon.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace DB
{
MetricsTransmitter::~MetricsTransmitter()
{
try
@ -32,7 +38,7 @@ MetricsTransmitter::~MetricsTransmitter()
void MetricsTransmitter::run()
{
auto & config = Poco::Util::Application::instance().config();
const auto & config = context.getConfigRef();
auto interval = config.getInt(config_name + ".interval", 60);
const std::string thread_name = "MericsTrns " + std::to_string(interval) + "s";
@ -63,7 +69,7 @@ void MetricsTransmitter::run()
void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_counters)
{
auto & config = Poco::Util::Application::instance().config();
const auto & config = context.getConfigRef();
auto async_metrics_values = async_metrics.getValues();
GraphiteWriter::KeyValueVector<ssize_t> key_vals{};

View File

@ -10,7 +10,10 @@
namespace DB
{
class AsynchronousMetrics;
class Context;
/** Automatically sends
* - difference of ProfileEvents;
@ -21,8 +24,12 @@ class AsynchronousMetrics;
class MetricsTransmitter
{
public:
MetricsTransmitter(const AsynchronousMetrics & async_metrics, const std::string & config_name)
: async_metrics{async_metrics}, config_name{config_name}
MetricsTransmitter(Context & context_,
const AsynchronousMetrics & async_metrics_,
const std::string & config_name_)
: context(context_)
, async_metrics(async_metrics_)
, config_name(config_name_)
{
}
~MetricsTransmitter();
@ -31,6 +38,8 @@ private:
void run();
void transmit(std::vector<ProfileEvents::Count> & prev_counters);
Context & context;
const AsynchronousMetrics & async_metrics;
const std::string config_name;
@ -43,4 +52,5 @@ private:
static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics.";
static constexpr auto asynchronous_metrics_path_prefix = "ClickHouse.AsynchronousMetrics.";
};
}

View File

@ -530,7 +530,8 @@ int Server::main(const std::vector<std::string> & args)
std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters;
for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{
metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(async_metrics, graphite_key));
metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(
*global_context, async_metrics, graphite_key));
}
SessionCleaner session_cleaner(*global_context);

View File

@ -57,16 +57,14 @@ namespace
if (!user_pw_end || !colon)
throw Exception{
"Shard address '" + address + "' does not match to 'user[:password]@host:port#default_database' pattern",
ErrorCodes::INCORRECT_FILE_NAME
};
ErrorCodes::INCORRECT_FILE_NAME};
const bool has_pw = colon < user_pw_end;
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception{
"Shard address '" + address + "' does not contain port",
ErrorCodes::INCORRECT_FILE_NAME
};
ErrorCodes::INCORRECT_FILE_NAME};
const char * has_db = strchr(address.data(), '#');
const char * port_end = has_db ? has_db : address_end;

View File

@ -1304,7 +1304,7 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts
for (const DataPartPtr & part : remove)
{
part->remove_time = clear_without_timeout ? 0 : time(0);
part->remove_time = clear_without_timeout ? 0 : time(nullptr);
if (data_parts.erase(part))
removePartContributionToColumnSizes(part);

View File

@ -145,7 +145,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
MergeTreeData::DataPartsVector parts = data.getDataPartsVector();
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
/// The virtual column `_sample_factor - 1 / <used sampling factor>` can be requested in the query.
/// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
Names virt_column_names;
Names real_column_names;

View File

@ -20,7 +20,7 @@ class IFunction;
using FunctionPtr = std::shared_ptr<IFunction>;
/** Range with open or closed ends; Perhaps unlimited.
/** Range with open or closed ends; possibly unbounded.
*/
struct Range
{
@ -31,12 +31,12 @@ private:
public:
Field left; /// the left border, if any
Field right; /// the right border, if any
bool left_bounded = false; /// limited to the left
bool right_bounded = false; /// limited to the right
bool left_bounded = false; /// bounded at the left
bool right_bounded = false; /// bounded at the right
bool left_included = false; /// includes the left border, if any
bool right_included = false; /// includes the right border, if any
/// The whole set.
/// The whole unversum.
Range() {}
/// One point.
@ -148,7 +148,7 @@ public:
/// r to the right of me.
if (r.left_bounded
&& right_bounded
&& (less(right, r.left) /// ...} {...
&& (less(right, r.left) /// ...} {...
|| ((!right_included || !r.left_included) /// ...) [... or ...] (...
&& equals(r.left, right))))
return false;
@ -193,15 +193,15 @@ public:
/** Condition on the index.
*
* Consists of the conditions for the key belonging to all possible ranges or sets,
* as well as logical links AND/OR/NOT above these conditions.
* as well as logical operators AND/OR/NOT above these conditions.
*
* Constructs a reverse polish notation from these conditions
* and can calculate (interpret) its feasibility over key ranges.
* and can calculate (interpret) its satisfiability over key ranges.
*/
class PKCondition
{
public:
/// Does not include the SAMPLE section. all_columns - the set of all columns of the table.
/// Does not take into account the SAMPLE section. all_columns - the set of all columns of the table.
PKCondition(
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -195,7 +195,7 @@ static void appendGraphitePattern(const Context & context,
static void setGraphitePatternsFromConfig(const Context & context,
const String & config_element, Graphite::Params & params)
{
const Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
const auto & config = context.getConfigRef();
if (!config.has(config_element))
throw Exception("No '" + config_element + "' element in configuration file",

View File

@ -185,7 +185,7 @@ private:
}
};
using MarksForColumns = std::vector<std::pair<size_t, Mark> >;
using MarksForColumns = std::vector<std::pair<size_t, Mark>>;
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
FileStreams streams;

View File

@ -1419,9 +1419,10 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
auto entry_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name);
/// We don't change table structure, only data in some parts, disable reading from them
/// We don't change table structure, only data in some parts
/// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart()
/// If we will lock the whole table here, a deadlock can occur. For example, if use use Buffer table (CLICKHOUSE-3238)
auto lock_read_structure = lockStructure(false);
auto lock_write_data = lockDataForAlter();
auto zookeeper = getZooKeeper();
@ -3708,7 +3709,7 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths
ThreadPool pool(task_info_list.size());
using Tasks = std::vector<std::packaged_task<size_t()> >;
using Tasks = std::vector<std::packaged_task<size_t()>>;
Tasks tasks(task_info_list.size());
try
@ -3790,11 +3791,26 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
if (!count)
return;
/// Part names that were successfully deleted from filesystem and should be deleted from ZooKeeper
Strings part_names;
auto remove_from_zookeeper = [&] ()
{
LOG_DEBUG(log, "Removed " << part_names.size() << " old parts from filesystem. Removing them from ZooKeeper.");
try
{
removePartsFromZooKeeper(zookeeper, part_names);
}
catch (...)
{
LOG_ERROR(log, "There is a problem with deleting parts from ZooKeeper: " << getCurrentExceptionMessage(false));
}
};
try
{
LOG_DEBUG(log, "Removing " << parts.size() << " old parts from filesystem");
Strings part_names;
while (!parts.empty())
{
MergeTreeData::DataPartPtr & part = parts.back();
@ -3802,42 +3818,81 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
part_names.emplace_back(part->name);
parts.pop_back();
}
LOG_DEBUG(log, "Removed " << part_names.size() << " old parts from filesystem. Removing them from ZooKeeper.");
try
{
removePartsFromZooKeeper(zookeeper, part_names);
}
catch (const zkutil::KeeperException & e)
{
LOG_ERROR(log, "There is a problem with deleting parts from ZooKeeper: " << getCurrentExceptionMessage(false));
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Finalize deletion of parts already deleted from filesystem, rollback remaining parts
data.addOldParts(parts);
remove_from_zookeeper();
throw;
}
/// Finalize deletion
remove_from_zookeeper();
LOG_DEBUG(log, "Removed " << count << " old parts");
}
static int32_t tryMultiWithRetries(zkutil::ZooKeeperPtr & zookeeper, zkutil::Ops & ops) noexcept
{
int32_t code;
try
{
code = zookeeper->tryMultiWithRetries(ops);
}
catch (const zkutil::KeeperException & e)
{
code = e.code;
}
return code;
}
void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names)
{
zkutil::Ops ops;
auto it_first_node_in_batch = part_names.cbegin();
for (auto it = part_names.cbegin(); it != part_names.cend(); ++it)
{
removePartFromZooKeeper(*it, ops);
if (ops.size() >= zkutil::MULTI_BATCH_SIZE || next(it) == part_names.cend())
auto it_next = std::next(it);
if (ops.size() >= zkutil::MULTI_BATCH_SIZE || it_next == part_names.cend())
{
/// It is Ok to use multi with retries to delete nodes, because new nodes with the same names cannot appear here
zookeeper->tryMultiWithRetries(ops);
auto code = tryMultiWithRetries(zookeeper, ops);
ops.clear();
if (code == ZNONODE)
{
/// Fallback
LOG_DEBUG(log, "There are no some part nodes in ZooKeeper, will remove part nodes sequentially");
for (auto it_in_batch = it_first_node_in_batch; it_in_batch != it_next; ++it_in_batch)
{
zkutil::Ops cur_ops;
removePartFromZooKeeper(*it_in_batch, cur_ops);
auto cur_code = tryMultiWithRetries(zookeeper, cur_ops);
if (cur_code == ZNONODE)
LOG_DEBUG(log, "There is no part " << *it_in_batch << " in ZooKeeper, it was only in filesystem");
else if (cur_code != ZOK)
LOG_WARNING(log, "Cannot remove part " << *it_in_batch << " from ZooKeeper: " << ::zerror(cur_code));
}
}
else if (code != ZOK)
{
LOG_WARNING(log, "There was a problem with deleting " << (it_next - it_first_node_in_batch)
<< " nodes from ZooKeeper: " << ::zerror(code));
}
it_first_node_in_batch = it_next;
}
}
}

View File

@ -1,11 +1,12 @@
#include <Storages/System/StorageSystemGraphite.h>
#include <Core/Field.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/Field.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Poco/Util/Application.h>
@ -63,9 +64,10 @@ static Pattern readOnePattern(
return pattern;
}
static std::vector<Pattern> readPatterns(const std::string & section)
static std::vector<Pattern> readPatterns(
const AbstractConfiguration & config,
const std::string & section)
{
const AbstractConfiguration & config = Application::instance().config();
AbstractConfiguration::Keys keys;
std::vector<Pattern> result;
size_t count = 0;
@ -92,9 +94,8 @@ static std::vector<Pattern> readPatterns(const std::string & section)
return result;
}
static Strings getAllGraphiteSections()
static Strings getAllGraphiteSections(const AbstractConfiguration & config)
{
const AbstractConfiguration & config = Application::instance().config();
Strings result;
AbstractConfiguration::Keys keys;
@ -180,10 +181,12 @@ BlockInputStreams StorageSystemGraphite::read(
col_is_default.column = std::make_shared<ColumnUInt8>();
block.insert(col_is_default);
Strings sections = getAllGraphiteSections();
const auto & config = context.getConfigRef();
Strings sections = getAllGraphiteSections(config);
for (const auto & section : sections)
{
const auto patterns = readPatterns(section);
const auto patterns = readPatterns(config, section);
for (const auto & pattern : patterns)
{
for (const auto & ret : pattern.retentions)

View File

@ -12,6 +12,7 @@ from dicttoxml import dicttoxml
import xml.dom.minidom
import docker
from docker.errors import ContainerError
from .client import Client, CommandRequest
@ -28,13 +29,15 @@ class ClickHouseCluster:
these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
"""
def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None):
def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None,
zookeeper_config_path=None):
self.base_dir = p.dirname(base_path)
self.name = name if name is not None else ''
self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/')
self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse')
self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client')
self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(HELPERS_DIR, 'zookeeper_config.xml')
self.project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
# docker-compose removes everything non-alphanumeric from project names so we do it too.
@ -42,6 +45,8 @@ class ClickHouseCluster:
self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))
self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
self.base_zookeeper_cmd = None
self.pre_zookkeeper_commands = []
self.instances = {}
self.with_zookeeper = False
@ -68,13 +73,15 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper,
self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
self.zookeeper_config_path, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
self.instances[name] = instance
self.base_cmd.extend(['--file', instance.docker_compose_path])
if with_zookeeper and not self.with_zookeeper:
self.with_zookeeper = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
return instance
@ -102,10 +109,15 @@ class ClickHouseCluster:
for instance in self.instances.values():
instance.create_dir(destroy_dir=destroy_dirs)
subprocess.check_call(self.base_cmd + ['up', '-d'])
self.docker_client = docker.from_env()
if self.with_zookeeper and self.base_zookeeper_cmd:
subprocess.check_call(self.base_zookeeper_cmd + ['up', '-d', '--no-recreate'])
for command in self.pre_zookkeeper_commands:
self.run_zookeeper_client_command(command, repeats=5)
subprocess.check_call(self.base_cmd + ['up', '-d', '--no-recreate'])
start_deadline = time.time() + 20.0 # seconds
for instance in self.instances.itervalues():
instance.docker_client = self.docker_client
@ -123,7 +135,7 @@ class ClickHouseCluster:
def shutdown(self, kill=True):
if kill:
subprocess.check_call(self.base_cmd + ['kill'])
subprocess.check_call(self.base_cmd + ['down', '--volumes'])
subprocess.check_call(self.base_cmd + ['down', '--volumes', '--remove-orphans'])
self.is_up = False
self.docker_client = None
@ -134,6 +146,22 @@ class ClickHouseCluster:
instance.client = None
def run_zookeeper_client_command(self, command, zoo_node = 'zoo1', repeats=1, sleep_for=1):
cli_cmd = 'zkCli.sh ' + command
zoo_name = self.get_instance_docker_id(zoo_node)
network_mode = 'container:' + zoo_name
for i in range(0, repeats - 1):
try:
return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)
except ContainerError:
time.sleep(sleep_for)
return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)
def add_zookeeper_startup_command(self, command):
self.pre_zookkeeper_commands.append(command)
DOCKER_COMPOSE_TEMPLATE = '''
version: '2'
services:
@ -157,7 +185,7 @@ services:
class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macroses,
with_zookeeper, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
with_zookeeper, zookeeper_config_path, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
self.name = name
self.base_cmd = cluster.base_cmd[:]
@ -171,6 +199,7 @@ class ClickHouseInstance:
self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
self.macroses = macroses if macroses is not None else {}
self.with_zookeeper = with_zookeeper
self.zookeeper_config_path = zookeeper_config_path
self.base_configs_dir = base_configs_dir
self.server_bin_path = server_bin_path
@ -287,7 +316,7 @@ class ClickHouseInstance:
# Put ZooKeeper config
if self.with_zookeeper:
shutil.copy(p.join(HELPERS_DIR, 'zookeeper_config.xml'), config_d_dir)
shutil.copy(self.zookeeper_config_path, config_d_dir)
# Copy config dir
if self.custom_config_dir:

View File

@ -0,0 +1,17 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,18 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
<root>/root_a</root>
</zookeeper>
</yandex>

View File

@ -0,0 +1,18 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
<root>/root_b</root>
</zookeeper>
</yandex>

View File

@ -0,0 +1,18 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
<identity>user:password</identity>
</zookeeper>
</yandex>

View File

@ -0,0 +1,96 @@
from helpers.cluster import ClickHouseCluster
import pytest
def test_chroot_with_same_root():
cluster_1 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_root_a.xml')
cluster_2 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_root_a.xml')
node1 = cluster_1.add_instance('node1', config_dir='configs', with_zookeeper=True)
node2 = cluster_2.add_instance('node2', config_dir='configs', with_zookeeper=True)
nodes = [node1, node2]
cluster_1.add_zookeeper_startup_command('create /root_a ""')
cluster_1.add_zookeeper_startup_command('ls / ')
try:
cluster_1.start()
try:
cluster_2.start(destroy_dirs=False)
for i, node in enumerate(nodes):
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
node.query("INSERT INTO simple VALUES ({0}, {0})".format(i))
assert node1.query('select count() from simple').strip() == '2'
assert node2.query('select count() from simple').strip() == '2'
finally:
cluster_2.shutdown()
finally:
cluster_1.shutdown()
def test_chroot_with_different_root():
cluster_1 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_root_a.xml')
cluster_2 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_root_b.xml')
node1 = cluster_1.add_instance('node1', config_dir='configs', with_zookeeper=True)
node2 = cluster_2.add_instance('node2', config_dir='configs', with_zookeeper=True)
nodes = [node1, node2]
cluster_1.add_zookeeper_startup_command('create /root_a ""')
cluster_1.add_zookeeper_startup_command('create /root_b ""')
cluster_1.add_zookeeper_startup_command('ls / ')
try:
cluster_1.start()
try:
cluster_2.start(destroy_dirs=False)
for i, node in enumerate(nodes):
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
node.query("INSERT INTO simple VALUES ({0}, {0})".format(i))
assert node1.query('select count() from simple').strip() == '1'
assert node2.query('select count() from simple').strip() == '1'
finally:
cluster_2.shutdown()
finally:
cluster_1.shutdown()
def test_identity():
cluster_1 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_with_password.xml')
cluster_2 = ClickHouseCluster(__file__)
node1 = cluster_1.add_instance('node1', config_dir='configs', with_zookeeper=True)
node2 = cluster_2.add_instance('node2', config_dir='configs', with_zookeeper=True)
try:
cluster_1.start()
# node1.query('''
# CREATE TABLE simple (date Date, id UInt32)
# ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
# '''.format(replica=node1.name))
with pytest.raises(Exception):
cluster_2.start(destroy_dirs=False)
finally:
cluster_1.shutdown()
cluster_2.shutdown()

View File

@ -0,0 +1,4 @@
0 [0,4,8,12,16] 28 0 16
1 [1,5,9,13,17] 29 1 17
2 [2,6,10,14,18] 30 2 18
3 [3,7,11,15,19] 31 3 19

View File

@ -0,0 +1 @@
SELECT number % 4 AS k, groupArray(number), groupBitOr(number), groupBitAnd(number), groupBitXor(number) FROM (SELECT * FROM system.numbers LIMIT 20) GROUP BY k ORDER BY k;

46
debian/.pbuilderrc vendored
View File

@ -1,15 +1,20 @@
# ubuntu:
# prepare:
# ln -s gutsy /usr/share/debootstrap/scripts/artful
# echo "3.0 (native)" > debian/source/format
# build ubuntu:
# sudo DIST=trusty pbuilder create --configfile debian/.pbuilderrc && DIST=trusty pdebuild --configfile debian/.pbuilderrc
# sudo DIST=xenial pbuilder create --configfile debian/.pbuilderrc && DIST=xenial pdebuild --configfile debian/.pbuilderrc
# sudo DIST=zesty pbuilder create --configfile debian/.pbuilderrc && DIST=zesty pdebuild --configfile debian/.pbuilderrc
# debian:
# sudo DIST=artful pbuilder create --configfile debian/.pbuilderrc && DIST=artful pdebuild --configfile debian/.pbuilderrc
# build debian:
# sudo DIST=experimental pbuilder create --configfile debian/.pbuilderrc && DIST=experimental pdebuild --configfile debian/.pbuilderrc
# sudo DIST=testing pbuilder create --configfile debian/.pbuilderrc && DIST=testing pdebuild --configfile debian/.pbuilderrc
# sudo DIST=unstable pbuilder create --configfile debian/.pbuilderrc && DIST=unstable pdebuild --configfile debian/.pbuilderrc
# sudo DIST=stable pbuilder create --configfile debian/.pbuilderrc && DIST=stable pdebuild --configfile debian/.pbuilderrc
# BROKEN:
# sudo DIST=xenial pbuilder create --configfile debian/.pbuilderrc && DIST=xenial pdebuild --configfile debian/.pbuilderrc
# sudo DIST=unstable pbuilder create --configfile debian/.pbuilderrc && DIST=unstable pdebuild --configfile debian/.pbuilderrc
# TODO:
# sudo DIST=zesty ARCH=i386 pbuilder create --configfile debian/.pbuilderrc && DIST=zesty ARCH=i386 pdebuild --configfile debian/.pbuilderrc
# sudo DIST=experimental ARCH=arm64 pbuilder create --configfile debian/.pbuilderrc && DIST=experimental ARCH=arm64 pdebuild --configfile debian/.pbuilderrc
# from https://wiki.debian.org/PbuilderTricks :
@ -25,14 +30,14 @@ DEBIAN_SUITES=($UNSTABLE_CODENAME $TESTING_CODENAME $STABLE_CODENAME $STABLE_BAC
"experimental" "unstable" "testing" "stable")
# List of Ubuntu suites. Update these when needed.
UBUNTU_SUITES=("zesty" "xenial" "trusty")
UBUNTU_SUITES=("artful" "zesty" "xenial" "trusty")
# Mirrors to use. Update these to your preferred mirror.
#DEBIAN_MIRROR="deb.debian.org"
DEBIAN_MIRROR="deb.debian.org"
#UBUNTU_MIRROR="mirrors.kernel.org"
UBUNTU_MIRROR="mirror.yandex.ru"
DEBIAN_MIRROR="mirror.yandex.ru"
#DEBIAN_MIRROR="mirror.yandex.ru"
# Optionally use the changelog of a package to determine the suite to use if
# none set.
@ -89,9 +94,15 @@ elif $(echo ${UBUNTU_SUITES[@]} | grep -q $DIST); then
OSNAME=ubuntu
MIRRORSITE="http://$UBUNTU_MIRROR/$OSNAME/"
COMPONENTS="main restricted universe multiverse"
OTHERMIRROR+="deb http://ppa.launchpad.net/ubuntu-toolchain-r/test/$OSNAME $DIST main"
case "$DIST" in
"trusty" | "xenial" )
OTHERMIRROR+="deb http://ppa.launchpad.net/ubuntu-toolchain-r/test/$OSNAME $DIST main"
ALLOWUNTRUSTED=yes
;;
esac
# deb http://apt.llvm.org/zesty/ llvm-toolchain-zesty-5.0 main
ALLOWUNTRUSTED=yes
else
echo "Unknown distribution: $DIST"
exit 1
@ -99,7 +110,22 @@ fi
echo "using $NAME $OSNAME $DIST $ARCH $LOGNAME"
CCACHEDIR=/var/cache/pbuilder/ccache
case "$DIST" in
"trusty")
# ccache broken
;;
*)
CCACHEDIR=/var/cache/pbuilder/ccache
;;
esac
case "$DIST" in
"trusty" | "artful" | "experimental" | "unstable" )
export DEB_CC=gcc-7
export DEB_CXX=g++-7
;;
esac
export CCACHE_PREFIX=
export DEB_BUILD_OPTIONS=parallel=`nproc`

4
debian/control vendored
View File

@ -3,8 +3,8 @@ Priority: optional
Maintainer: Alexey Milovidov <milovidov@yandex-team.ru>
Build-Depends: debhelper (>= 9),
cmake,
gcc-6, g++-6,
default-libmysqlclient-dev | libmysqlclient-dev,
gcc-7 | gcc-6, g++-7 | g++-6,
libmariadbclient-dev | default-libmysqlclient-dev | libmysqlclient-dev,
libicu-dev,
libltdl-dev,
libreadline-dev,

31
debian/rules vendored
View File

@ -19,22 +19,28 @@ DEB_HOST_MULTIARCH ?= $(shell dpkg-architecture -qDEB_HOST_MULTIARCH)
#endif
#DEB_BUILD_OPTIONS+=parallel=$(THREADS_COUNT)
DEB_CC ?= gcc-6
DEB_CXX ?= g++-6
DEB_CLANG ?= $(shell which clang-6.0 || which clang-5.0 || which clang-4.0 || which clang || which clang-3.9 || which clang-3.8)
# CMAKE_FLAGS_ADD += -DINTERNAL_COMPILER_EXECUTABLE=$(basename $(DEB_CLANG)) # TODO: this is actual only if you will also change clang name in copy_clang_binaries.sh
CMAKE_FLAGS += -DENABLE_TESTS=0
DEB_BUILD_GNU_TYPE := $(shell dpkg-architecture -qDEB_BUILD_GNU_TYPE)
DEB_HOST_GNU_TYPE := $(shell dpkg-architecture -qDEB_HOST_GNU_TYPE)
DEB_CLANG ?= $(shell which clang-6.0 || which clang-5.0 || which clang-4.0 || which clang || which clang-3.9 || which clang-3.8)
# CMAKE_FLAGS += -DINTERNAL_COMPILER_EXECUTABLE=$(basename $(DEB_CLANG)) # TODO: this is actual only if you will also change clang name in copy_clang_binaries.sh
#DEB_CC ?= gcc-6
#DEB_CXX ?= g++-6
ifdef DEB_CXX
DEB_BUILD_GNU_TYPE := $(shell dpkg-architecture -qDEB_BUILD_GNU_TYPE)
DEB_HOST_GNU_TYPE := $(shell dpkg-architecture -qDEB_HOST_GNU_TYPE)
ifeq ($(DEB_BUILD_GNU_TYPE),$(DEB_HOST_GNU_TYPE))
CC := $(DEB_CC)
CXX := $(DEB_CXX)
CC := $(DEB_CC)
CXX := $(DEB_CXX)
else
CC := $(DEB_HOST_GNU_TYPE)-$(DEB_CC)
CXX := $(DEB_HOST_GNU_TYPE)-$(DEB_CXX)
CC := $(DEB_HOST_GNU_TYPE)-$(DEB_CC)
CXX := $(DEB_HOST_GNU_TYPE)-$(DEB_CXX)
endif
endif
CMAKE_FLAGS ?= -DCMAKE_CXX_COMPILER=`which $(CXX)` -DCMAKE_C_COMPILER=`which $(CC)` -DENABLE_TESTS=0 $(CMAKE_FLAGS_ADD)
CMAKE_FLAGS += -DCMAKE_CXX_COMPILER=`which $(CXX)` -DCMAKE_C_COMPILER=`which $(CC)`
ifndef DH_VERBOSE
CMAKE_FLAGS += -DCMAKE_VERBOSE_MAKEFILE=0
endif
@ -76,9 +82,10 @@ override_dh_install:
# In case building clickhouse-server, adding to package binary of clang, ld and header files - for dynamic compilation.
mkdir -p $(DESTDIR)/usr/share/clickhouse/bin $(DESTDIR)/usr/share/clickhouse/headers
ifeq ($(USE_INTERNAL_COMPILER),1)
CLANG=$(DEB_CLANG) debian/copy_clang_binaries.sh $(DESTDIR)/usr/share/clickhouse/bin/
CLANG=$(DEB_CLANG) ./copy_headers.sh . $(DESTDIR)/usr/share/clickhouse/headers
endif
# fake metrika files when private dir is empty
mkdir -p debian/tmp/etc/clickhouse-server/metrika

View File

@ -159,7 +159,7 @@ If the data differs on various replicas, first sync it, or delete this data on a
Rename the existing MergeTree table, then create a ReplicatedMergeTree table with the old name.
Move the data from the old table to the 'detached' subdirectory inside the directory with the new table data (``/var/lib/clickhouse/data/db_name/table_name/``).
Then run ALTER TABLE ATTACH PART on one of the replicas to add these data parts to the working set.
Then run ALTER TABLE ATTACH PARTITION on one of the replicas to add these data parts to the working set.
If exactly the same parts exist on the other replicas, they are added to the working set on them. If not, the parts are downloaded from the replica that has them.

View File

@ -163,7 +163,7 @@ ReplicatedSummingMergeTree
Переименуйте имеющуюся MergeTree таблицу, затем создайте со старым именем таблицу типа ReplicatedMergeTree.
Перенесите данные из старой таблицы в поддиректорию detached в директории с данными новой таблицы (``/var/lib/clickhouse/data/db_name/table_name/``).
Затем добавьте эти куски данных в рабочий набор с помощью выполнения запросов ALTER TABLE ATTACH PART на одной из реплик.
Затем добавьте эти куски данных в рабочий набор с помощью выполнения запросов ALTER TABLE ATTACH PARTITION на одной из реплик.
Преобразование из ReplicatedMergeTree в MergeTree
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -22,9 +22,9 @@ if (ENABLE_MYSQL)
find_path (MYSQL_INCLUDE_DIR NAMES mysql/mysql.h PATHS ${MYSQL_INCLUDE_PATHS})
if (USE_STATIC_LIBRARIES)
find_library (STATIC_MYSQLCLIENT_LIB libmysqlclient.a PATHS ${MYSQL_LIB_PATHS})
find_library (STATIC_MYSQLCLIENT_LIB mariadbclient mysqlclient PATHS ${MYSQL_LIB_PATHS})
else ()
find_library (MYSQLCLIENT_LIB mysqlclient PATHS ${MYSQL_LIB_PATHS})
find_library (MYSQLCLIENT_LIB mariadbclient mysqlclient PATHS ${MYSQL_LIB_PATHS})
endif ()
if (MYSQL_INCLUDE_DIR AND (STATIC_MYSQLCLIENT_LIB OR MYSQLCLIENT_LIB))

View File

@ -7,6 +7,9 @@ cd $CURDIR
source "./release_lib.sh"
DEB_CC=gcc-6
DEB_CXX=g++-6
CONTROL=debian/control
DEBUILD_NOSIGN_OPTIONS="-us -uc"
@ -58,7 +61,7 @@ if [ -z "$THREAD_COUNT" ] ; then
THREAD_COUNT=`nproc || grep -c ^processor /proc/cpuinfo`
fi
CMAKE_FLAGS_ADD+=" $LIBTCMALLOC_OPTS -DCMAKE_BUILD_TYPE=$CMAKE_BUILD_TYPE"
CMAKE_FLAGS+=" $LIBTCMALLOC_OPTS -DCMAKE_BUILD_TYPE=$CMAKE_BUILD_TYPE"
REVISION+=$VERSION_POSTFIX
echo -e "\nCurrent revision is $REVISION"
@ -66,4 +69,4 @@ echo -e "\nCurrent revision is $REVISION"
gen_changelog "$REVISION" "" "$AUTHOR" ""
# Build (only binary packages).
debuild -e PATH -e SSH_AUTH_SOCK -e DEB_BUILD_OPTIONS=parallel=$THREAD_COUNT -e DEB_CC -e DEB_CXX -e DEB_CLANG -e CMAKE_FLAGS_ADD="$CMAKE_FLAGS_ADD" -b ${DEBUILD_NOSIGN_OPTIONS} ${DEBUILD_NODEPS_OPTIONS}
debuild -e PATH -e SSH_AUTH_SOCK -e DEB_BUILD_OPTIONS=parallel=$THREAD_COUNT -e DEB_CC=$DEB_CC -e DEB_CXX=$DEB_CXX -e USE_INTERNAL_COMPILER=1 -e CMAKE_FLAGS="$CMAKE_FLAGS" -b ${DEBUILD_NOSIGN_OPTIONS} ${DEBUILD_NODEPS_OPTIONS}

View File

@ -1,3 +1,6 @@
set +e
function get_revision {
BASEDIR=$(dirname "${BASH_SOURCE[0]}")
grep "set(VERSION_REVISION" ${BASEDIR}/dbms/cmake/version.cmake | sed 's/^.*VERSION_REVISION \(.*\))$/\1/'
@ -25,7 +28,7 @@ function gen_revision_author {
while [ $succeeded -eq 0 ] && [ $attempts -le $max_attempts ]; do
attempts=$(($attempts + 1))
REVISION=$(($REVISION + 1))
( git_tag_grep=`git tag | grep "$VERSION_PREFIX$REVISION$VERSION_POSTFIX"` ) || true
git_tag_grep=`git tag | grep "$VERSION_PREFIX$REVISION$VERSION_POSTFIX"`
if [ "$git_tag_grep" == "" ]; then
succeeded=1
fi
@ -36,13 +39,13 @@ function gen_revision_author {
fi
auto_message="Auto version update to"
( git_log_grep=`git log --oneline --max-count=1 | grep "$auto_message"` ) || true
git_log_grep=`git log --oneline --max-count=1 | grep "$auto_message"`
if [ "$git_log_grep" == "" ]; then
tag="$VERSION_PREFIX$REVISION$VERSION_POSTFIX"
# First tag for correct git describe
echo -e "\nTrying to create tag: $tag"
git tag -a "$tag" -m "$tag" || true
git tag -a "$tag" -m "$tag"
git_describe=`git describe`
sed -i -- "s/VERSION_REVISION .*)/VERSION_REVISION $REVISION)/g;s/VERSION_DESCRIBE .*)/VERSION_DESCRIBE $git_describe)/g" dbms/cmake/version.cmake

@ -1 +0,0 @@
Subproject commit cc6a20a9711dc7083c2a8ea79c7127d6fa87faee

View File

@ -14,6 +14,8 @@ REMOTE_NAME="registry.yandex.net/${FULL_NAME}"
DOCKER_HASH="$2"
if [[ -z "$1" ]]
then
git clone https://github.com/yandex/clickhouse-presentations.git presentations || true
git --work-tree=$(readlink -f presentations) --git-dir=$(readlink -f presentations)/.git pull
gulp clean
gulp build
docker build -t "${FULL_NAME}" "${BASE_DIR}"