Merge branch 'master' into header-in-input-streams

This commit is contained in:
Alexey Milovidov 2018-01-10 03:05:51 +03:00
commit 550053a0ca
58 changed files with 1176 additions and 609 deletions

19
cmake/find_gtest.cmake Normal file
View File

@ -0,0 +1,19 @@
option (USE_INTERNAL_GTEST_LIBRARY "Set to FALSE to use system Google Test instead of bundled" ${NOT_UNBUNDLED})
if (USE_INTERNAL_GTEST_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest/CMakeLists.txt")
message (WARNING "submodule contrib/googletest is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_GTEST_LIBRARY 0)
endif ()
if (NOT USE_INTERNAL_GTEST_LIBRARY)
find_library (GTEST_LIBRARY gtest_main)
find_path (GTEST_INCLUDE_DIR NAMES /gtest/gtest.h PATHS ${GTEST_INCLUDE_PATHS})
endif ()
if (GTEST_LIBRARY AND GTEST_INCLUDE_DIR)
else ()
set (USE_INTERNAL_GTEST_LIBRARY 1)
set (GTEST_LIBRARY gtest_main)
endif ()
message (STATUS "Using gtest: ${GTEST_INCLUDE_DIR} : ${GTEST_LIBRARY}")

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit 3a986afbb977fa13582991ce8f2c0b2045ffaa33 Subproject commit c3d50eb613704fb9c8ab3bce95a88275cb5875b7

View File

@ -219,6 +219,21 @@ target_include_directories (clickhouse_common_io BEFORE PRIVATE ${COMMON_INCLUDE
add_subdirectory (tests) add_subdirectory (tests)
if (ENABLE_TESTS) if (ENABLE_TESTS)
include (${PROJECT_SOURCE_DIR}/cmake/find_gtest.cmake)
if (USE_INTERNAL_GTEST_LIBRARY)
# Google Test from sources
add_subdirectory(${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
# avoid problems with <regexp.h>
target_compile_definitions (gtest INTERFACE GTEST_HAS_POSIX_RE=0)
target_include_directories (gtest INTERFACE ${ClickHouse_SOURCE_DIR}/contrib/googletest/include)
endif ()
macro(grep_gtest_sources BASE_DIR DST_VAR)
# Cold match files that are not in tests/ directories
file(GLOB_RECURSE "${DST_VAR}" RELATIVE "${BASE_DIR}" "gtest*.cpp")
endmacro()
# attach all dbms gtest sources # attach all dbms gtest sources
grep_gtest_sources(${ClickHouse_SOURCE_DIR}/dbms dbms_gtest_sources) grep_gtest_sources(${ClickHouse_SOURCE_DIR}/dbms dbms_gtest_sources)
add_executable(unit_tests_dbms ${dbms_gtest_sources}) add_executable(unit_tests_dbms ${dbms_gtest_sources})

View File

@ -1,6 +1,6 @@
# This strings autochanged from release_lib.sh: # This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54330-testing) set(VERSION_DESCRIBE v1.1.54331-testing)
set(VERSION_REVISION 54330) set(VERSION_REVISION 54331)
# end of autochange # end of autochange
set (VERSION_MAJOR 1) set (VERSION_MAJOR 1)

View File

@ -50,16 +50,16 @@ public:
return type_res; return type_res;
} }
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{ {
if (this->data(place).value.changeIfBetter(*columns[1], row_num)) if (this->data(place).value.changeIfBetter(*columns[1], row_num, arena))
this->data(place).result.change(*columns[0], row_num); this->data(place).result.change(*columns[0], row_num, arena);
} }
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{ {
if (this->data(place).value.changeIfBetter(this->data(rhs).value)) if (this->data(place).value.changeIfBetter(this->data(rhs).value, arena))
this->data(place).result.change(this->data(rhs).result); this->data(place).result.change(this->data(rhs).result, arena);
} }
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -68,10 +68,10 @@ public:
this->data(place).value.write(buf, *type_val); this->data(place).value.write(buf, *type_val);
} }
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{ {
this->data(place).result.read(buf, *type_res); this->data(place).result.read(buf, *type_res, arena);
this->data(place).value.read(buf, *type_val); this->data(place).value.read(buf, *type_val, arena);
} }
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override

View File

@ -24,12 +24,13 @@ namespace DB
template <typename T> template <typename T>
struct SingleValueDataFixed struct SingleValueDataFixed
{ {
private:
using Self = SingleValueDataFixed<T>; using Self = SingleValueDataFixed<T>;
bool has_value = false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf. bool has_value = false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf.
T value; T value;
public:
bool has() const bool has() const
{ {
return has_value; return has_value;
@ -50,7 +51,7 @@ struct SingleValueDataFixed
writeBinary(value, buf); writeBinary(value, buf);
} }
void read(ReadBuffer & buf, const IDataType & /*data_type*/) void read(ReadBuffer & buf, const IDataType & /*data_type*/, Arena *)
{ {
readBinary(has_value, buf); readBinary(has_value, buf);
if (has()) if (has())
@ -58,96 +59,96 @@ struct SingleValueDataFixed
} }
void change(const IColumn & column, size_t row_num) void change(const IColumn & column, size_t row_num, Arena *)
{ {
has_value = true; has_value = true;
value = static_cast<const ColumnVector<T> &>(column).getData()[row_num]; value = static_cast<const ColumnVector<T> &>(column).getData()[row_num];
} }
/// Assuming to.has() /// Assuming to.has()
void change(const Self & to) void change(const Self & to, Arena *)
{ {
has_value = true; has_value = true;
value = to.value; value = to.value;
} }
bool changeFirstTime(const IColumn & column, size_t row_num) bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has()) if (!has())
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeFirstTime(const Self & to) bool changeFirstTime(const Self & to, Arena * arena)
{ {
if (!has() && to.has()) if (!has() && to.has())
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeEveryTime(const IColumn & column, size_t row_num) bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena)
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
bool changeEveryTime(const Self & to) bool changeEveryTime(const Self & to, Arena * arena)
{ {
if (to.has()) if (to.has())
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfLess(const IColumn & column, size_t row_num) bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has() || static_cast<const ColumnVector<T> &>(column).getData()[row_num] < value) if (!has() || static_cast<const ColumnVector<T> &>(column).getData()[row_num] < value)
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfLess(const Self & to) bool changeIfLess(const Self & to, Arena * arena)
{ {
if (to.has() && (!has() || to.value < value)) if (to.has() && (!has() || to.value < value))
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfGreater(const IColumn & column, size_t row_num) bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has() || static_cast<const ColumnVector<T> &>(column).getData()[row_num] > value) if (!has() || static_cast<const ColumnVector<T> &>(column).getData()[row_num] > value)
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfGreater(const Self & to) bool changeIfGreater(const Self & to, Arena * arena)
{ {
if (to.has() && (!has() || to.value > value)) if (to.has() && (!has() || to.value > value))
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
@ -169,27 +170,23 @@ struct SingleValueDataFixed
/** For strings. Short strings are stored in the object itself, and long strings are allocated separately. /** For strings. Short strings are stored in the object itself, and long strings are allocated separately.
* NOTE It could also be suitable for arrays of numbers. * NOTE It could also be suitable for arrays of numbers.
*/ */
struct __attribute__((__packed__, __aligned__(1))) SingleValueDataString struct SingleValueDataString
{ {
private:
using Self = SingleValueDataString; using Self = SingleValueDataString;
Int32 size = -1; /// -1 indicates that there is no value. Int32 size = -1; /// -1 indicates that there is no value.
Int32 capacity = 0; /// power of two or zero
char * large_data;
public:
static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64; static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64;
static constexpr Int32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size); static constexpr Int32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size) - sizeof(capacity) - sizeof(large_data);
union __attribute__((__packed__, __aligned__(1))) private:
{ char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero.
char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero.
char * __attribute__((__packed__, __aligned__(1))) large_data;
};
~SingleValueDataString()
{
if (size > MAX_SMALL_STRING_SIZE)
free(large_data);
}
public:
bool has() const bool has() const
{ {
return size >= 0; return size >= 0;
@ -220,7 +217,7 @@ struct __attribute__((__packed__, __aligned__(1))) SingleValueDataString
buf.write(getData(), size); buf.write(getData(), size);
} }
void read(ReadBuffer & buf, const IDataType & /*data_type*/) void read(ReadBuffer & buf, const IDataType & /*data_type*/, Arena * arena)
{ {
Int32 rhs_size; Int32 rhs_size;
readBinary(rhs_size, buf); readBinary(rhs_size, buf);
@ -229,8 +226,7 @@ struct __attribute__((__packed__, __aligned__(1))) SingleValueDataString
{ {
if (rhs_size <= MAX_SMALL_STRING_SIZE) if (rhs_size <= MAX_SMALL_STRING_SIZE)
{ {
if (size > MAX_SMALL_STRING_SIZE) /// Don't free large_data here.
free(large_data);
size = rhs_size; size = rhs_size;
@ -239,12 +235,11 @@ struct __attribute__((__packed__, __aligned__(1))) SingleValueDataString
} }
else else
{ {
if (size < rhs_size) if (capacity < rhs_size)
{ {
if (size > MAX_SMALL_STRING_SIZE) capacity = static_cast<UInt32>(roundUpToPowerOfTwoOrZero(rhs_size));
free(large_data); /// Don't free large_data here.
large_data = arena->alloc(capacity);
large_data = reinterpret_cast<char *>(malloc(rhs_size));
} }
size = rhs_size; size = rhs_size;
@ -253,22 +248,19 @@ struct __attribute__((__packed__, __aligned__(1))) SingleValueDataString
} }
else else
{ {
if (size > MAX_SMALL_STRING_SIZE) /// Don't free large_data here.
free(large_data);
size = rhs_size; size = rhs_size;
} }
} }
/// Assuming to.has() /// Assuming to.has()
void changeImpl(StringRef value) void changeImpl(StringRef value, Arena * arena)
{ {
Int32 value_size = value.size; Int32 value_size = value.size;
if (value_size <= MAX_SMALL_STRING_SIZE) if (value_size <= MAX_SMALL_STRING_SIZE)
{ {
if (size > MAX_SMALL_STRING_SIZE) /// Don't free large_data here.
free(large_data);
size = value_size; size = value_size;
if (size > 0) if (size > 0)
@ -276,12 +268,11 @@ struct __attribute__((__packed__, __aligned__(1))) SingleValueDataString
} }
else else
{ {
if (size < value_size) if (capacity < value_size)
{ {
if (size > MAX_SMALL_STRING_SIZE) /// Don't free large_data here.
free(large_data); capacity = roundUpToPowerOfTwoOrZero(value_size);
large_data = arena->alloc(capacity);
large_data = reinterpret_cast<char *>(malloc(value.size));
} }
size = value_size; size = value_size;
@ -289,93 +280,93 @@ struct __attribute__((__packed__, __aligned__(1))) SingleValueDataString
} }
} }
void change(const IColumn & column, size_t row_num) void change(const IColumn & column, size_t row_num, Arena * arena)
{ {
changeImpl(static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num)); changeImpl(static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num), arena);
} }
void change(const Self & to) void change(const Self & to, Arena * arena)
{ {
changeImpl(to.getStringRef()); changeImpl(to.getStringRef(), arena);
} }
bool changeFirstTime(const IColumn & column, size_t row_num) bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has()) if (!has())
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeFirstTime(const Self & to) bool changeFirstTime(const Self & to, Arena * arena)
{ {
if (!has() && to.has()) if (!has() && to.has())
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeEveryTime(const IColumn & column, size_t row_num) bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena)
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
bool changeEveryTime(const Self & to) bool changeEveryTime(const Self & to, Arena * arena)
{ {
if (to.has()) if (to.has())
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfLess(const IColumn & column, size_t row_num) bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has() || static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) < getStringRef()) if (!has() || static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) < getStringRef())
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfLess(const Self & to) bool changeIfLess(const Self & to, Arena * arena)
{ {
if (to.has() && (!has() || to.getStringRef() < getStringRef())) if (to.has() && (!has() || to.getStringRef() < getStringRef()))
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfGreater(const IColumn & column, size_t row_num) bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has() || static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) > getStringRef()) if (!has() || static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) > getStringRef())
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfGreater(const Self & to) bool changeIfGreater(const Self & to, Arena * arena)
{ {
if (to.has() && (!has() || to.getStringRef() > getStringRef())) if (to.has() && (!has() || to.getStringRef() > getStringRef()))
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
@ -401,10 +392,12 @@ static_assert(
/// For any other value types. /// For any other value types.
struct SingleValueDataGeneric struct SingleValueDataGeneric
{ {
private:
using Self = SingleValueDataGeneric; using Self = SingleValueDataGeneric;
Field value; Field value;
public:
bool has() const bool has() const
{ {
return !value.isNull(); return !value.isNull();
@ -429,7 +422,7 @@ struct SingleValueDataGeneric
writeBinary(false, buf); writeBinary(false, buf);
} }
void read(ReadBuffer & buf, const IDataType & data_type) void read(ReadBuffer & buf, const IDataType & data_type, Arena *)
{ {
bool is_not_null; bool is_not_null;
readBinary(is_not_null, buf); readBinary(is_not_null, buf);
@ -438,60 +431,60 @@ struct SingleValueDataGeneric
data_type.deserializeBinary(value, buf); data_type.deserializeBinary(value, buf);
} }
void change(const IColumn & column, size_t row_num) void change(const IColumn & column, size_t row_num, Arena *)
{ {
column.get(row_num, value); column.get(row_num, value);
} }
void change(const Self & to) void change(const Self & to, Arena *)
{ {
value = to.value; value = to.value;
} }
bool changeFirstTime(const IColumn & column, size_t row_num) bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has()) if (!has())
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeFirstTime(const Self & to) bool changeFirstTime(const Self & to, Arena * arena)
{ {
if (!has() && to.has()) if (!has() && to.has())
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeEveryTime(const IColumn & column, size_t row_num) bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena)
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
bool changeEveryTime(const Self & to) bool changeEveryTime(const Self & to, Arena * arena)
{ {
if (to.has()) if (to.has())
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfLess(const IColumn & column, size_t row_num) bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has()) if (!has())
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
@ -508,22 +501,22 @@ struct SingleValueDataGeneric
} }
} }
bool changeIfLess(const Self & to) bool changeIfLess(const Self & to, Arena * arena)
{ {
if (to.has() && (!has() || to.value < value)) if (to.has() && (!has() || to.value < value))
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
return false; return false;
} }
bool changeIfGreater(const IColumn & column, size_t row_num) bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (!has()) if (!has())
{ {
change(column, row_num); change(column, row_num, arena);
return true; return true;
} }
else else
@ -540,11 +533,11 @@ struct SingleValueDataGeneric
} }
} }
bool changeIfGreater(const Self & to) bool changeIfGreater(const Self & to, Arena * arena)
{ {
if (to.has() && (!has() || to.value > value)) if (to.has() && (!has() || to.value > value))
{ {
change(to); change(to, arena);
return true; return true;
} }
else else
@ -573,8 +566,8 @@ struct AggregateFunctionMinData : Data
{ {
using Self = AggregateFunctionMinData<Data>; using Self = AggregateFunctionMinData<Data>;
bool changeIfBetter(const IColumn & column, size_t row_num) { return this->changeIfLess(column, row_num); } bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfLess(column, row_num, arena); }
bool changeIfBetter(const Self & to) { return this->changeIfLess(to); } bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfLess(to, arena); }
static const char * name() { return "min"; } static const char * name() { return "min"; }
}; };
@ -584,8 +577,8 @@ struct AggregateFunctionMaxData : Data
{ {
using Self = AggregateFunctionMaxData<Data>; using Self = AggregateFunctionMaxData<Data>;
bool changeIfBetter(const IColumn & column, size_t row_num) { return this->changeIfGreater(column, row_num); } bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfGreater(column, row_num, arena); }
bool changeIfBetter(const Self & to) { return this->changeIfGreater(to); } bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfGreater(to, arena); }
static const char * name() { return "max"; } static const char * name() { return "max"; }
}; };
@ -595,8 +588,8 @@ struct AggregateFunctionAnyData : Data
{ {
using Self = AggregateFunctionAnyData<Data>; using Self = AggregateFunctionAnyData<Data>;
bool changeIfBetter(const IColumn & column, size_t row_num) { return this->changeFirstTime(column, row_num); } bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeFirstTime(column, row_num, arena); }
bool changeIfBetter(const Self & to) { return this->changeFirstTime(to); } bool changeIfBetter(const Self & to, Arena * arena) { return this->changeFirstTime(to, arena); }
static const char * name() { return "any"; } static const char * name() { return "any"; }
}; };
@ -606,8 +599,8 @@ struct AggregateFunctionAnyLastData : Data
{ {
using Self = AggregateFunctionAnyLastData<Data>; using Self = AggregateFunctionAnyLastData<Data>;
bool changeIfBetter(const IColumn & column, size_t row_num) { return this->changeEveryTime(column, row_num); } bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeEveryTime(column, row_num, arena); }
bool changeIfBetter(const Self & to) { return this->changeEveryTime(to); } bool changeIfBetter(const Self & to, Arena * arena) { return this->changeEveryTime(to, arena); }
static const char * name() { return "anyLast"; } static const char * name() { return "anyLast"; }
}; };
@ -625,7 +618,7 @@ struct AggregateFunctionAnyHeavyData : Data
using Self = AggregateFunctionAnyHeavyData<Data>; using Self = AggregateFunctionAnyHeavyData<Data>;
bool changeIfBetter(const IColumn & column, size_t row_num) bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena)
{ {
if (this->isEqualTo(column, row_num)) if (this->isEqualTo(column, row_num))
{ {
@ -635,7 +628,7 @@ struct AggregateFunctionAnyHeavyData : Data
{ {
if (counter == 0) if (counter == 0)
{ {
this->change(column, row_num); this->change(column, row_num, arena);
++counter; ++counter;
return true; return true;
} }
@ -645,7 +638,7 @@ struct AggregateFunctionAnyHeavyData : Data
return false; return false;
} }
bool changeIfBetter(const Self & to) bool changeIfBetter(const Self & to, Arena * arena)
{ {
if (this->isEqualTo(to)) if (this->isEqualTo(to))
{ {
@ -655,7 +648,7 @@ struct AggregateFunctionAnyHeavyData : Data
{ {
if (counter < to.counter) if (counter < to.counter)
{ {
this->change(to); this->change(to, arena);
return true; return true;
} }
else else
@ -670,9 +663,9 @@ struct AggregateFunctionAnyHeavyData : Data
writeBinary(counter, buf); writeBinary(counter, buf);
} }
void read(ReadBuffer & buf, const IDataType & data_type) void read(ReadBuffer & buf, const IDataType & data_type, Arena * arena)
{ {
Data::read(buf, data_type); Data::read(buf, data_type, arena);
readBinary(counter, buf); readBinary(counter, buf);
} }
@ -705,14 +698,14 @@ public:
return type; return type;
} }
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{ {
this->data(place).changeIfBetter(*columns[0], row_num); this->data(place).changeIfBetter(*columns[0], row_num, arena);
} }
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{ {
this->data(place).changeIfBetter(this->data(rhs)); this->data(place).changeIfBetter(this->data(rhs), arena);
} }
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -720,9 +713,9 @@ public:
this->data(place).write(buf, *type.get()); this->data(place).write(buf, *type.get());
} }
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{ {
this->data(place).read(buf, *type.get()); this->data(place).read(buf, *type.get(), arena);
} }
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override

View File

@ -64,9 +64,9 @@ void Connection::connect()
{ {
socket = std::make_unique<Poco::Net::StreamSocket>(); socket = std::make_unique<Poco::Net::StreamSocket>();
} }
socket->connect(resolved_address, connect_timeout); socket->connect(resolved_address, timeouts.connection_timeout);
socket->setReceiveTimeout(receive_timeout); socket->setReceiveTimeout(timeouts.receive_timeout);
socket->setSendTimeout(send_timeout); socket->setSendTimeout(timeouts.send_timeout);
socket->setNoDelay(true); socket->setNoDelay(true);
in = std::make_shared<ReadBufferFromPocoSocket>(*socket); in = std::make_shared<ReadBufferFromPocoSocket>(*socket);

View File

@ -17,6 +17,7 @@
#include <DataStreams/BlockStreamProfileInfo.h> #include <DataStreams/BlockStreamProfileInfo.h>
#include <IO/CompressionSettings.h> #include <IO/CompressionSettings.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Settings.h> #include <Interpreters/Settings.h>
#include <Interpreters/TablesStatus.h> #include <Interpreters/TablesStatus.h>
@ -54,12 +55,10 @@ class Connection : private boost::noncopyable
public: public:
Connection(const String & host_, UInt16 port_, const String & default_database_, Connection(const String & host_, UInt16 port_, const String & default_database_,
const String & user_, const String & password_, const String & user_, const String & password_,
const ConnectionTimeouts & timeouts_,
const String & client_name_ = "client", const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Encryption encryption_ = Protocol::Encryption::Disable, Protocol::Encryption encryption_ = Protocol::Encryption::Disable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0)) Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
: :
host(host_), port(port_), default_database(default_database_), host(host_), port(port_), default_database(default_database_),
@ -67,7 +66,7 @@ public:
client_name(client_name_), client_name(client_name_),
compression(compression_), compression(compression_),
encryption(encryption_), encryption(encryption_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_), timeouts(timeouts_),
sync_request_timeout(sync_request_timeout_), sync_request_timeout(sync_request_timeout_),
log_wrapper(*this) log_wrapper(*this)
{ {
@ -82,12 +81,10 @@ public:
Connection(const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_, Connection(const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
const String & default_database_, const String & default_database_,
const String & user_, const String & password_, const String & user_, const String & password_,
const ConnectionTimeouts & timeouts_,
const String & client_name_ = "client", const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Encryption encryption_ = Protocol::Encryption::Disable, Protocol::Encryption encryption_ = Protocol::Encryption::Disable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0)) Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
: :
host(host_), port(port_), host(host_), port(port_),
@ -97,7 +94,7 @@ public:
client_name(client_name_), client_name(client_name_),
compression(compression_), compression(compression_),
encryption(encryption_), encryption(encryption_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_), timeouts(timeouts_),
sync_request_timeout(sync_request_timeout_), sync_request_timeout(sync_request_timeout_),
log_wrapper(*this) log_wrapper(*this)
{ {
@ -233,9 +230,7 @@ private:
*/ */
ThrottlerPtr throttler; ThrottlerPtr throttler;
Poco::Timespan connect_timeout; ConnectionTimeouts timeouts;
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;
Poco::Timespan sync_request_timeout; Poco::Timespan sync_request_timeout;
/// From where to read query execution result. /// From where to read query execution result.

View File

@ -3,7 +3,7 @@
#include <Common/PoolBase.h> #include <Common/PoolBase.h>
#include <Client/Connection.h> #include <Client/Connection.h>
#include <IO/ConnectionTimeouts.h>
namespace DB namespace DB
{ {
@ -48,17 +48,15 @@ public:
const String & host_, UInt16 port_, const String & host_, UInt16 port_,
const String & default_database_, const String & default_database_,
const String & user_, const String & password_, const String & user_, const String & password_,
const ConnectionTimeouts & timeouts,
const String & client_name_ = "client", const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Encryption encryption_ = Protocol::Encryption::Disable, Protocol::Encryption encryption_ = Protocol::Encryption::Disable)
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0))
: Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")), : Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_), port(port_), default_database(default_database_), host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), resolved_address(host_, port_), user(user_), password(password_), resolved_address(host_, port_),
client_name(client_name_), compression(compression_), encryption(encryption_), client_name(client_name_), compression(compression_), encryption(encryption_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_) timeouts(timeouts)
{ {
} }
@ -66,17 +64,15 @@ public:
const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_, const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
const String & default_database_, const String & default_database_,
const String & user_, const String & password_, const String & user_, const String & password_,
const ConnectionTimeouts & timeouts,
const String & client_name_ = "client", const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Encryption encryption_ = Protocol::Encryption::Disable, Protocol::Encryption encryption_ = Protocol::Encryption::Disable)
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0))
: Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")), : Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_), port(port_), default_database(default_database_), host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), resolved_address(resolved_address_), user(user_), password(password_), resolved_address(resolved_address_),
client_name(client_name_), compression(compression_), encryption(encryption_), client_name(client_name_), compression(compression_), encryption(encryption_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_) timeouts(timeouts)
{ {
} }
@ -105,9 +101,8 @@ protected:
{ {
return std::make_shared<Connection>( return std::make_shared<Connection>(
host, port, resolved_address, host, port, resolved_address,
default_database, user, password, default_database, user, password, timeouts,
client_name, compression, encryption, client_name, compression, encryption);
connect_timeout, receive_timeout, send_timeout);
} }
private: private:
@ -126,9 +121,7 @@ private:
Protocol::Compression compression; /// Whether to compress data when interacting with the server. Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Encryption encryption; /// Whether to encrypt data when interacting with the server. Protocol::Encryption encryption; /// Whether to encrypt data when interacting with the server.
Poco::Timespan connect_timeout; ConnectionTimeouts timeouts;
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;
}; };
} }

View File

@ -71,6 +71,9 @@
#define DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS 7500 #define DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS 7500
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
#define ALWAYS_INLINE __attribute__((__always_inline__)) #define ALWAYS_INLINE __attribute__((__always_inline__))
#define NO_INLINE __attribute__((__noinline__)) #define NO_INLINE __attribute__((__noinline__))

View File

@ -7,7 +7,7 @@
#include <Common/isLocalAddress.h> #include <Common/isLocalAddress.h>
#include <memory> #include <memory>
#include <ext/range.h> #include <ext/range.h>
#include <IO/ConnectionTimeouts.h>
namespace DB namespace DB
{ {
@ -21,11 +21,13 @@ namespace ErrorCodes
static const size_t MAX_CONNECTIONS = 16; static const size_t MAX_CONNECTIONS = 16;
static ConnectionPoolWithFailoverPtr createPool( static ConnectionPoolWithFailoverPtr createPool(
const std::string & host, UInt16 port, const std::string & db, const std::string & user, const std::string & password) const std::string & host, UInt16 port, const std::string & db,
const std::string & user, const std::string & password, const Context & context)
{ {
auto timeouts = ConnectionTimeouts::getTCPTimeouts(context.getSettingsRef());
ConnectionPoolPtrs pools; ConnectionPoolPtrs pools;
pools.emplace_back(std::make_shared<ConnectionPool>( pools.emplace_back(std::make_shared<ConnectionPool>(
MAX_CONNECTIONS, host, port, db, user, password, "ClickHouseDictionarySource")); MAX_CONNECTIONS, host, port, db, user, password, timeouts, "ClickHouseDictionarySource"));
return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM); return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM);
} }
@ -46,7 +48,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks},
sample_block{sample_block}, context(context), sample_block{sample_block}, context(context),
is_local{isLocalAddress({ host, port }, config.getInt("tcp_port", 0))}, is_local{isLocalAddress({ host, port }, config.getInt("tcp_port", 0))},
pool{is_local ? nullptr : createPool(host, port, db, user, password)}, pool{is_local ? nullptr : createPool(host, port, db, user, password, context)},
load_all_query{query_builder.composeLoadAllQuery()} load_all_query{query_builder.composeLoadAllQuery()}
{} {}
@ -59,7 +61,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks},
sample_block{other.sample_block}, context(other.context), sample_block{other.sample_block}, context(other.context),
is_local{other.is_local}, is_local{other.is_local},
pool{is_local ? nullptr : createPool(host, port, db, user, password)}, pool{is_local ? nullptr : createPool(host, port, db, user, password, context)},
load_all_query{other.load_all_query} load_all_query{other.load_all_query}
{} {}

View File

@ -8,7 +8,7 @@
#include <IO/WriteBufferFromOStream.h> #include <IO/WriteBufferFromOStream.h>
#include <Dictionaries/DictionarySourceHelpers.h> #include <Dictionaries/DictionarySourceHelpers.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <IO/ConnectionTimeouts.h>
namespace DB namespace DB
{ {
@ -24,7 +24,8 @@ HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_stru
url{config.getString(config_prefix + ".url", "")}, url{config.getString(config_prefix + ".url", "")},
format{config.getString(config_prefix + ".format")}, format{config.getString(config_prefix + ".format")},
sample_block{sample_block}, sample_block{sample_block},
context(context) context(context),
timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))
{ {
} }
@ -34,7 +35,8 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
url{other.url}, url{other.url},
format{other.format}, format{other.format},
sample_block{other.sample_block}, sample_block{other.sample_block},
context(other.context) context(other.context),
timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))
{ {
} }
@ -42,7 +44,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
{ {
LOG_TRACE(log, "loadAll " + toString()); LOG_TRACE(log, "loadAll " + toString());
Poco::URI uri(url); Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET); auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts);
auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr)); return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
} }
@ -59,7 +62,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
}; };
Poco::URI uri(url); Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback); auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST,
out_stream_callback, timeouts);
auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr)); return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
} }
@ -77,7 +81,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(
}; };
Poco::URI uri(url); Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback); auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST,
out_stream_callback, timeouts);
auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr)); return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
} }

View File

@ -3,7 +3,7 @@
#include <Dictionaries/IDictionarySource.h> #include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h> #include <Dictionaries/DictionaryStructure.h>
#include <common/LocalDateTime.h> #include <common/LocalDateTime.h>
#include <IO/ConnectionTimeouts.h>
namespace Poco { class Logger; } namespace Poco { class Logger; }
@ -48,6 +48,7 @@ private:
const std::string format; const std::string format;
Block sample_block; Block sample_block;
const Context & context; const Context & context;
ConnectionTimeouts timeouts;
}; };
} }

View File

@ -0,0 +1,52 @@
#pragma once
#include <Poco/Timespan.h>
#include <Interpreters/Settings.h>
namespace DB
{
struct ConnectionTimeouts
{
Poco::Timespan connection_timeout;
Poco::Timespan send_timeout;
Poco::Timespan receive_timeout;
ConnectionTimeouts() = default;
ConnectionTimeouts(const Poco::Timespan & connection_timeout_,
const Poco::Timespan & send_timeout_,
const Poco::Timespan & receive_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_)
{
}
static Poco::Timespan saturate(const Poco::Timespan & timespan, const Poco::Timespan & limit)
{
if (limit.totalMicroseconds() == 0)
return timespan;
else
return (timespan > limit) ? limit : timespan;
}
ConnectionTimeouts getSaturated(const Poco::Timespan & limit) const
{
return ConnectionTimeouts(saturate(connection_timeout, limit),
saturate(send_timeout, limit),
saturate(receive_timeout, limit));
}
static ConnectionTimeouts getTCPTimeouts(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout);
}
static ConnectionTimeouts getHTTPTimeouts(const Settings & settings)
{
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout);
}
};
}

View File

@ -27,8 +27,8 @@ namespace ErrorCodes
ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri, ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method_, const std::string & method_,
OutStreamCallback out_stream_callback, OutStreamCallback out_stream_callback,
size_t buffer_size_, const ConnectionTimeouts & timeouts,
const HTTPTimeouts & timeouts) size_t buffer_size_)
: ReadBuffer(nullptr, 0), : ReadBuffer(nullptr, 0),
uri{uri}, uri{uri},
method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}, method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET},

View File

@ -4,6 +4,7 @@
#include <Poco/Net/HTTPClientSession.h> #include <Poco/Net/HTTPClientSession.h>
#include <Poco/URI.h> #include <Poco/URI.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/ConnectionTimeouts.h>
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 #define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 #define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
@ -13,13 +14,6 @@ namespace DB
const int HTTP_TOO_MANY_REQUESTS = 429; const int HTTP_TOO_MANY_REQUESTS = 429;
struct HTTPTimeouts
{
Poco::Timespan connection_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, 0);
Poco::Timespan send_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0);
Poco::Timespan receive_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0);
};
/** Perform HTTP POST request and provide response to read. /** Perform HTTP POST request and provide response to read.
*/ */
@ -28,7 +22,7 @@ class ReadWriteBufferFromHTTP : public ReadBuffer
private: private:
Poco::URI uri; Poco::URI uri;
std::string method; std::string method;
HTTPTimeouts timeouts; ConnectionTimeouts timeouts;
bool is_ssl; bool is_ssl;
std::unique_ptr<Poco::Net::HTTPClientSession> session; std::unique_ptr<Poco::Net::HTTPClientSession> session;
@ -38,12 +32,12 @@ private:
public: public:
using OutStreamCallback = std::function<void(std::ostream &)>; using OutStreamCallback = std::function<void(std::ostream &)>;
ReadWriteBufferFromHTTP( explicit ReadWriteBufferFromHTTP(
const Poco::URI & uri, const Poco::URI & uri,
const std::string & method = {}, const std::string & method = {},
OutStreamCallback out_stream_callback = {}, OutStreamCallback out_stream_callback = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, const ConnectionTimeouts & timeouts = {},
const HTTPTimeouts & timeouts = {}); size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override; bool nextImpl() override;
}; };

View File

@ -37,9 +37,12 @@ public:
{ {
std::make_pair("action", "read"), std::make_pair("action", "read"),
std::make_pair("path", path), std::make_pair("path", path),
std::make_pair("compress", (compress ? "true" : "false"))}); std::make_pair("compress", (compress ? "true" : "false"))
});
impl = std::make_unique<ReadWriteBufferFromHTTP>(uri, std::string(), ReadWriteBufferFromHTTP::OutStreamCallback(), buffer_size, HTTPTimeouts{connection_timeout, send_timeout, receive_timeout}); ConnectionTimeouts timeouts(connection_timeout, send_timeout, receive_timeout);
ReadWriteBufferFromHTTP::OutStreamCallback callback;
impl = std::make_unique<ReadWriteBufferFromHTTP>(uri, std::string(), callback, timeouts, buffer_size);
} }
bool nextImpl() override bool nextImpl() override
@ -56,7 +59,7 @@ public:
const std::string & host, const std::string & host,
int port, int port,
const std::string & path, const std::string & path,
size_t timeout = 0) const ConnectionTimeouts & timeouts)
{ {
Poco::URI uri; Poco::URI uri;
uri.setScheme("http"); uri.setScheme("http");
@ -67,7 +70,7 @@ public:
std::make_pair("action", "list"), std::make_pair("action", "list"),
std::make_pair("path", path)}); std::make_pair("path", path)});
ReadWriteBufferFromHTTP in(uri, {}, {}, {}, HTTPTimeouts{timeout}); ReadWriteBufferFromHTTP in(uri, {}, {}, timeouts);
std::vector<std::string> files; std::vector<std::string> files;
while (!in.eof()) while (!in.eof())

View File

@ -210,10 +210,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
settings.distributed_connections_pool_size, settings.distributed_connections_pool_size,
address.host_name, address.port, address.resolved_address, address.host_name, address.port, address.resolved_address,
address.default_database, address.user, address.password, address.default_database, address.user, address.password,
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable, ConnectionTimeouts::getTCPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
saturate(settings.connect_timeout, settings.limits.max_execution_time), "server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time)));
info.pool = std::make_shared<ConnectionPoolWithFailover>( info.pool = std::make_shared<ConnectionPoolWithFailover>(
std::move(pools), settings.load_balancing, settings.connections_with_failover_max_tries); std::move(pools), settings.load_balancing, settings.connections_with_failover_max_tries);
@ -289,10 +287,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
settings.distributed_connections_pool_size, settings.distributed_connections_pool_size,
replica.host_name, replica.port, replica.resolved_address, replica.host_name, replica.port, replica.resolved_address,
replica.default_database, replica.user, replica.password, replica.default_database, replica.user, replica.password,
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable, ConnectionTimeouts::getTCPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time), "server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time)));
} }
} }
@ -348,10 +344,8 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
settings.distributed_connections_pool_size, settings.distributed_connections_pool_size,
replica.host_name, replica.port, replica.resolved_address, replica.host_name, replica.port, replica.resolved_address,
replica.default_database, replica.user, replica.password, replica.default_database, replica.user, replica.password,
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable, ConnectionTimeouts::getHTTPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time), "server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time)));
} }
} }

View File

@ -101,7 +101,7 @@ void InterpreterSelectQuery::init(const BlockInputStreamPtr & input, const Names
} }
} }
if (is_first_select_inside_union_all && (hasAsterisk())) if (is_first_select_inside_union_all && hasAsterisk())
{ {
basicInit(input); basicInit(input);

View File

@ -1150,7 +1150,10 @@ private:
++rows_added; ++rows_added;
if (rows_added == max_block_size) if (rows_added == max_block_size)
{
++it;
break; break;
}
} }
return rows_added; return rows_added;

View File

@ -300,7 +300,11 @@ struct Settings
/* Timeout for flushing data from streaming storages. */ \ /* Timeout for flushing data from streaming storages. */ \
M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS, "Timeout for flushing data from streaming storages.") \ M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS, "Timeout for flushing data from streaming storages.") \
/* Schema identifier (used by schema-based formats) */ \ /* Schema identifier (used by schema-based formats) */ \
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
\
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \
M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \
M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \
/// Possible limits for query execution. /// Possible limits for query execution.

View File

@ -16,8 +16,8 @@ namespace ErrorCodes
{ {
extern const int UNION_ALL_COLUMN_ALIAS_MISMATCH; extern const int UNION_ALL_COLUMN_ALIAS_MISMATCH;
extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH; extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH;
extern const int UNKNOWN_IDENTIFIER;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int THERE_IS_NO_COLUMN;
} }
@ -73,68 +73,74 @@ void ASTSelectQuery::renameColumns(const ASTSelectQuery & source)
void ASTSelectQuery::rewriteSelectExpressionList(const Names & required_column_names) void ASTSelectQuery::rewriteSelectExpressionList(const Names & required_column_names)
{ {
/// All columns are kept if we have DISTINCT.
if (distinct)
return;
/** Always keep columns that contain arrayJoin inside.
* In addition, keep all columns in 'required_column_names'.
* If SELECT has at least one asterisk, replace it with the rest of required_column_names
* and ignore all other asterisks.
* We must keep columns in same related order.
*/
/// Analyze existing expression list.
using ASTAndPosition = std::pair<ASTPtr, size_t>;
std::map<String, ASTAndPosition> columns_with_array_join;
std::map<String, ASTAndPosition> other_required_columns_in_select;
ASTAndPosition asterisk;
size_t position = 0;
for (const auto & child : select_expression_list->children)
{
if (typeid_cast<const ASTAsterisk *>(child.get()))
{
if (!asterisk.first)
asterisk = { child, position };
}
else
{
auto name = child->getAliasOrColumnName();
if (hasArrayJoin(child))
columns_with_array_join[name] = { child, position };
else if (required_column_names.end() != std::find(required_column_names.begin(), required_column_names.end(), name))
other_required_columns_in_select[name] = { child, position };
}
++position;
}
/// Create a new expression list.
std::vector<ASTAndPosition> new_children;
for (const auto & name_child : other_required_columns_in_select)
new_children.push_back(name_child.second);
for (const auto & name_child : columns_with_array_join)
new_children.push_back(name_child.second);
for (const auto & name : required_column_names)
{
if (!other_required_columns_in_select.count(name) && !columns_with_array_join.count(name))
{
if (asterisk.first)
new_children.push_back({ std::make_shared<ASTIdentifier>(asterisk.first->range, name), asterisk.second });
else
throw Exception("SELECT query doesn't have required column: " + backQuoteIfNeed(name), ErrorCodes::THERE_IS_NO_COLUMN);
}
}
std::sort(new_children.begin(), new_children.end(), [](const auto & a, const auto & b) { return a.second < b.second; });
ASTPtr result = std::make_shared<ASTExpressionList>(); ASTPtr result = std::make_shared<ASTExpressionList>();
ASTs asts = select_expression_list->children;
/// Create a mapping. for (const auto & child : new_children)
result->children.push_back(child.first);
/// The element of mapping. /// Replace expression list in the query.
struct Arrow
{
Arrow() = default;
explicit Arrow(size_t to_position_) :
to_position(to_position_), is_selected(true)
{
}
size_t to_position = 0;
bool is_selected = false;
};
/// Mapping of one SELECT expression to another.
using Mapping = std::vector<Arrow>;
Mapping mapping(asts.size());
/// On which position in the SELECT expression is the corresponding column from `column_names`.
std::vector<size_t> positions_of_required_columns(required_column_names.size());
/// We will not throw out expressions that contain the `arrayJoin` function.
for (size_t i = 0; i < asts.size(); ++i)
{
if (hasArrayJoin(asts[i]))
mapping[i] = Arrow(i);
}
for (size_t i = 0; i < required_column_names.size(); ++i)
{
size_t j = 0;
for (; j < asts.size(); ++j)
{
if (asts[j]->getAliasOrColumnName() == required_column_names[i])
{
positions_of_required_columns[i] = j;
break;
}
}
if (j == asts.size())
throw Exception("Error while rewriting expression list for select query."
" Could not find alias: " + required_column_names[i],
DB::ErrorCodes::UNKNOWN_IDENTIFIER);
}
std::vector<size_t> positions_of_required_columns_in_subquery_order = positions_of_required_columns;
std::sort(positions_of_required_columns_in_subquery_order.begin(), positions_of_required_columns_in_subquery_order.end());
for (size_t i = 0; i < required_column_names.size(); ++i)
mapping[positions_of_required_columns_in_subquery_order[i]] = Arrow(positions_of_required_columns[i]);
/// Construct a new expression.
for (const auto & arrow : mapping)
{
if (arrow.is_selected)
result->children.push_back(asts[arrow.to_position]->clone());
}
for (auto & child : children) for (auto & child : children)
{ {
@ -147,9 +153,9 @@ void ASTSelectQuery::rewriteSelectExpressionList(const Names & required_column_n
select_expression_list = result; select_expression_list = result;
/** NOTE: It might seem that we could spoil the query by throwing an expression with an alias that is used somewhere else. /** NOTE: It might seem that we could spoil the query by throwing an expression with an alias that is used somewhere else.
* This can not happen, because this method is always called for a query, for which ExpressionAnalyzer was created at least once, * This can not happen, because this method is always called for a query, for which ExpressionAnalyzer was created at least once,
* which ensures that all aliases in it are already set. Not quite obvious logic. * which ensures that all aliases in it are already set. Not quite obvious logic.
*/ */
} }
ASTPtr ASTSelectQuery::clone() const ASTPtr ASTSelectQuery::clone() const

View File

@ -32,6 +32,7 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <DataStreams/RemoteBlockInputStream.h> #include <DataStreams/RemoteBlockInputStream.h>
@ -64,10 +65,10 @@ public:
const String & host_, UInt16 port_, const String & default_database_, const String & host_, UInt16 port_, const String & default_database_,
const String & user_, const String & password_, const String & stage, const String & user_, const String & password_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_, bool randomize_, size_t max_iterations_, double max_time_,
const String & json_path_, const Settings & settings_) const String & json_path_, const ConnectionTimeouts & timeouts, const Settings & settings_)
: :
concurrency(concurrency_), delay(delay_), queue(concurrency), concurrency(concurrency_), delay(delay_), queue(concurrency),
connections(concurrency, host_, port_, default_database_, user_, password_), connections(concurrency, host_, port_, default_database_, user_, password_, timeouts),
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_), randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency) json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency)
{ {
@ -482,6 +483,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["iterations"].as<size_t>(), options["iterations"].as<size_t>(),
options["timelimit"].as<double>(), options["timelimit"].as<double>(),
options["json"].as<std::string>(), options["json"].as<std::string>(),
ConnectionTimeouts::getTCPTimeouts(settings),
settings); settings);
} }
catch (...) catch (...)

View File

@ -61,9 +61,9 @@ if (CLICKHOUSE_SPLIT_BINARY)
target_link_libraries (clickhouse-performance-test clickhouse-performance-test-lib dbms) target_link_libraries (clickhouse-performance-test clickhouse-performance-test-lib dbms)
add_executable (clickhouse-extract-from-config clickhouse-extract-from-config.cpp) add_executable (clickhouse-extract-from-config clickhouse-extract-from-config.cpp)
target_link_libraries (clickhouse-extract-from-config clickhouse-extract-from-config-lib) target_link_libraries (clickhouse-extract-from-config clickhouse-extract-from-config-lib)
# now in utils # Also in utils
#add_executable (clickhouse-compressor clickhouse-compressor.cpp) add_executable (clickhouse-compressor clickhouse-compressor.cpp)
#target_link_libraries (clickhouse-compressor clickhouse-compressor-lib) target_link_libraries (clickhouse-compressor clickhouse-compressor-lib)
add_executable (clickhouse-format clickhouse-format.cpp) add_executable (clickhouse-format clickhouse-format.cpp)
target_link_libraries (clickhouse-format clickhouse-format-lib dbms) target_link_libraries (clickhouse-format clickhouse-format-lib dbms)
@ -108,6 +108,7 @@ else ()
add_custom_target (clickhouse-benchmark ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-benchmark DEPENDS clickhouse) add_custom_target (clickhouse-benchmark ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-benchmark DEPENDS clickhouse)
add_custom_target (clickhouse-performance-test ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-performance-test DEPENDS clickhouse) add_custom_target (clickhouse-performance-test ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-performance-test DEPENDS clickhouse)
add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse) add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse)
add_custom_target (clickhouse-compressor ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-compressor DEPENDS clickhouse)
add_custom_target (clickhouse-format ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-format DEPENDS clickhouse) add_custom_target (clickhouse-format ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-format DEPENDS clickhouse)
# install always because depian package want this files: # install always because depian package want this files:
add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse) add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse)
@ -121,12 +122,13 @@ else ()
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-benchmark ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-benchmark
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-performance-test ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-performance-test
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-compressor
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-format ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-format
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-clang ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-clang
${CMAKE_CURRENT_BINARY_DIR}/clickhouse-lld ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-lld
DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
add_custom_target (clickhouse-bundle ALL DEPENDS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test clickhouse-extract-from-config clickhouse-format clickhouse-clang clickhouse-lld) add_custom_target (clickhouse-bundle ALL DEPENDS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-clang clickhouse-lld)
endif () endif ()
install ( install (

View File

@ -357,8 +357,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
{ {
Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("http_port")); Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("http_port"));
Poco::Net::ServerSocket http_socket(http_socket_address); Poco::Net::ServerSocket http_socket(http_socket_address);
http_socket.setReceiveTimeout(settings.receive_timeout); http_socket.setReceiveTimeout(settings.http_receive_timeout);
http_socket.setSendTimeout(settings.send_timeout); http_socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer( servers.emplace_back(new Poco::Net::HTTPServer(
new HTTPHandlerFactory(*this, "HTTPHandler-factory"), new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
@ -376,8 +376,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::call_once(ssl_init_once, SSLInit); std::call_once(ssl_init_once, SSLInit);
Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("https_port")); Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("https_port"));
Poco::Net::SecureServerSocket http_socket(http_socket_address); Poco::Net::SecureServerSocket http_socket(http_socket_address);
http_socket.setReceiveTimeout(settings.receive_timeout); http_socket.setReceiveTimeout(settings.http_receive_timeout);
http_socket.setSendTimeout(settings.send_timeout); http_socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer( servers.emplace_back(new Poco::Net::HTTPServer(
new HTTPHandlerFactory(*this, "HTTPHandler-factory"), new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
@ -438,8 +438,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
{ {
Poco::Net::SocketAddress interserver_address = make_socket_address(listen_host, config().getInt("interserver_http_port")); Poco::Net::SocketAddress interserver_address = make_socket_address(listen_host, config().getInt("interserver_http_port"));
Poco::Net::ServerSocket interserver_io_http_socket(interserver_address); Poco::Net::ServerSocket interserver_io_http_socket(interserver_address);
interserver_io_http_socket.setReceiveTimeout(settings.receive_timeout); interserver_io_http_socket.setReceiveTimeout(settings.http_receive_timeout);
interserver_io_http_socket.setSendTimeout(settings.send_timeout); interserver_io_http_socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer( servers.emplace_back(new Poco::Net::HTTPServer(
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"), new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool, server_pool,

View File

@ -1,4 +1,5 @@
<yandex> <yandex>
<path replace="replace">./</path> <path replace="replace">./</path>
<tmp_path replace="replace">./tmp/</tmp_path> <tmp_path replace="replace">./tmp/</tmp_path>
<format_schema_path replace="replace">./format_schemas/</format_schema_path>
</yandex> </yandex>

View File

@ -326,5 +326,5 @@
<!-- Directory in <clickhouse-path> containing schema files for various input formats. <!-- Directory in <clickhouse-path> containing schema files for various input formats.
The directory will be created if it doesn't exist. The directory will be created if it doesn't exist.
--> -->
<format_schema_path>format_schemas/</format_schema_path> <format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
</yandex> </yandex>

View File

@ -150,13 +150,14 @@ void StorageDistributedDirectoryMonitor::run()
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage) ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
{ {
const auto pool_factory = [&storage, &name] (const std::string & host, const UInt16 port, auto timeouts = ConnectionTimeouts::getTCPTimeouts(storage.context.getSettingsRef());
const auto pool_factory = [&storage, &name, &timeouts] (const std::string & host, const UInt16 port,
const std::string & user, const std::string & password, const std::string & user, const std::string & password,
const std::string & default_database) const std::string & default_database)
{ {
return std::make_shared<ConnectionPool>( return std::make_shared<ConnectionPool>(
1, host, port, default_database, 1, host, port, default_database,
user, password, user, password, timeouts,
storage.getName() + '_' + name); storage.getName() + '_' + name);
}; };

View File

@ -155,6 +155,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const String & replica_path, const String & replica_path,
const String & host, const String & host,
int port, int port,
const ConnectionTimeouts & timeouts,
bool to_detached) bool to_detached)
{ {
Poco::URI uri; Poco::URI uri;
@ -168,7 +169,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{"compress", "false"} {"compress", "false"}
}); });
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST}; ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts};
static const String TMP_PREFIX = "tmp_fetch_"; static const String TMP_PREFIX = "tmp_fetch_";
String relative_part_path = String(to_detached ? "detached/" : "") + TMP_PREFIX + part_name; String relative_part_path = String(to_detached ? "detached/" : "") + TMP_PREFIX + part_name;

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <IO/HashingWriteBuffer.h> #include <IO/HashingWriteBuffer.h>
#include <IO/copyData.h> #include <IO/copyData.h>
#include <IO/ConnectionTimeouts.h>
namespace DB namespace DB
@ -51,6 +52,7 @@ public:
const String & replica_path, const String & replica_path,
const String & host, const String & host,
int port, int port,
const ConnectionTimeouts & timeouts,
bool to_detached = false); bool to_detached = false);
/// You need to stop the data transfer. /// You need to stop the data transfer.

View File

@ -276,6 +276,7 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
if (sign_column.empty()) if (sign_column.empty())
throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR);
bool miss_column = true;
for (const auto & column : columns) for (const auto & column : columns)
{ {
if (column.name == sign_column) if (column.name == sign_column)
@ -284,9 +285,12 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
throw Exception("Sign column (" + sign_column + ")" throw Exception("Sign column (" + sign_column + ")"
" for storage CollapsingMergeTree must have type Int8." " for storage CollapsingMergeTree must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD); " Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
miss_column = false;
break; break;
} }
} }
if (miss_column)
throw Exception("Sign column " + sign_column + " does not exist in table declaration.");
} }
else if (!sign_column.empty()) else if (!sign_column.empty())
throw Exception("Sign column for MergeTree cannot be specified in all modes except Collapsing.", ErrorCodes::LOGICAL_ERROR); throw Exception("Sign column for MergeTree cannot be specified in all modes except Collapsing.", ErrorCodes::LOGICAL_ERROR);
@ -311,6 +315,7 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
throw Exception("Version column for MergeTree cannot be specified in all modes except Replacing.", throw Exception("Version column for MergeTree cannot be specified in all modes except Replacing.",
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
bool miss_column = true;
for (const auto & column : columns) for (const auto & column : columns)
{ {
if (column.name == version_column) if (column.name == version_column)
@ -324,9 +329,12 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
throw Exception("Version column (" + version_column + ")" throw Exception("Version column (" + version_column + ")"
" for storage ReplacingMergeTree must have type of UInt family or Date or DateTime." " for storage ReplacingMergeTree must have type of UInt family or Date or DateTime."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD); " Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
miss_column = false;
break; break;
} }
} }
if (miss_column)
throw Exception("Version column " + version_column + " does not exist in table declaration.");
} }
/// TODO Checks for Graphite mode. /// TODO Checks for Graphite mode.

View File

@ -273,19 +273,27 @@ MergeTreeDataPartChecksums MergeTreeDataPartChecksums::parse(const String & s)
return res; return res;
} }
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetChecksum(const String & name, const String & ext) const
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetBinChecksum(const String & name) const
{ {
if (checksums.empty()) if (checksums.empty())
return nullptr; return nullptr;
const auto & files = checksums.files; const auto & files = checksums.files;
const auto bin_file_name = escapeForFileName(name) + ".bin"; const auto file_name = escapeForFileName(name) + ext;
auto it = files.find(bin_file_name); auto it = files.find(file_name);
return (it == files.end()) ? nullptr : &it->second; return (it == files.end()) ? nullptr : &it->second;
} }
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetBinChecksum(const String & name) const
{
return tryGetChecksum(name, ".bin");
}
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetMrkChecksum(const String & name) const
{
return tryGetChecksum(name, ".mrk");
}
static ReadBufferFromFile openForReading(const String & path) static ReadBufferFromFile openForReading(const String & path)
{ {
@ -398,6 +406,13 @@ size_t MergeTreeDataPart::getColumnUncompressedSize(const String & name) const
} }
size_t MergeTreeDataPart::getColumnMrkSize(const String & name) const
{
const Checksum * checksum = tryGetMrkChecksum(name);
return checksum ? checksum->file_size : 0;
}
/** Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /** Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
* If no checksums are present returns the name of the first physically existing column. * If no checksums are present returns the name of the first physically existing column.
*/ */
@ -926,6 +941,18 @@ size_t MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
return res; return res;
} }
size_t MergeTreeDataPart::getTotalMrkSizeInBytes() const
{
size_t res = 0;
for (const NameAndTypePair & it : columns)
{
const Checksum * checksum = tryGetMrkChecksum(it.name);
if (checksum)
res += checksum->file_size;
}
return res;
}
String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state) String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state)
{ {
switch (state) switch (state)

View File

@ -97,12 +97,17 @@ struct MergeTreeDataPart
MergeTreeDataPart(MergeTreeData & storage_, const String & name_); MergeTreeDataPart(MergeTreeData & storage_, const String & name_);
const Checksum * tryGetChecksum(const String & name, const String & ext) const;
/// Returns checksum of column's binary file. /// Returns checksum of column's binary file.
const Checksum * tryGetBinChecksum(const String & name) const; const Checksum * tryGetBinChecksum(const String & name) const;
/// Returns checksum of column's mrk file.
const Checksum * tryGetMrkChecksum(const String & name) const;
/// Returns the size of .bin file for column `name` if found, zero otherwise /// Returns the size of .bin file for column `name` if found, zero otherwise
size_t getColumnCompressedSize(const String & name) const; size_t getColumnCompressedSize(const String & name) const;
size_t getColumnUncompressedSize(const String & name) const; size_t getColumnUncompressedSize(const String & name) const;
/// Returns the size of .mrk file for column `name` if found, zero otherwise
size_t getColumnMrkSize(const String & name) const;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column. /// If no checksums are present returns the name of the first physically existing column.
@ -294,6 +299,8 @@ struct MergeTreeDataPart
/// For data in RAM ('index') /// For data in RAM ('index')
size_t getIndexSizeInBytes() const; size_t getIndexSizeInBytes() const;
size_t getIndexSizeInAllocatedBytes() const; size_t getIndexSizeInAllocatedBytes() const;
/// Total size of *.mrk files
size_t getTotalMrkSizeInBytes() const;
private: private:
/// Reads columns names and types from columns.txt /// Reads columns names and types from columns.txt

View File

@ -20,6 +20,7 @@
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/InterpreterAlterQuery.h> #include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/PartLog.h> #include <Interpreters/PartLog.h>
@ -2124,8 +2125,9 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
Stopwatch stopwatch; Stopwatch stopwatch;
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart( MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(
part_name, replica_path, address.host, address.replication_port, to_detached); part_name, replica_path, address.host, address.replication_port, timeouts, to_detached);
if (!to_detached) if (!to_detached)
{ {
@ -2363,7 +2365,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
if (!partition) if (!partition)
{ {
selected = merger.selectPartsToMerge( selected = merger.selectPartsToMerge(
future_merged_part, false, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge); future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge);
} }
else else
{ {
@ -3094,11 +3096,12 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
/// NOTE Works only if there is access from the default user without a password. You can fix it by adding a parameter to the server config. /// NOTE Works only if there is access from the default user without a password. You can fix it by adding a parameter to the server config.
auto timeouts = ConnectionTimeouts::getTCPTimeouts(context.getSettingsRef());
Connection connection( Connection connection(
leader_address.host, leader_address.host,
leader_address.queries_port, leader_address.queries_port,
leader_address.database, leader_address.database,
"", "", "ClickHouse replica"); "", "", timeouts, "ClickHouse replica");
RemoteBlockInputStream stream(connection, formattedAST(new_query), context, &settings); RemoteBlockInputStream stream(connection, formattedAST(new_query), context, &settings);
NullBlockOutputStream output; NullBlockOutputStream output;

View File

@ -14,10 +14,8 @@
namespace DB namespace DB
{ {
StorageSystemParts::StorageSystemParts(const std::string & name)
StorageSystemParts::StorageSystemParts(const std::string & name_) : StorageSystemPartsBase(name,
: name(name_),
columns
{ {
{"partition", std::make_shared<DataTypeString>()}, {"partition", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()}, {"name", std::make_shared<DataTypeString>()},
@ -41,265 +39,52 @@ StorageSystemParts::StorageSystemParts(const std::string & name_)
{"table", std::make_shared<DataTypeString>()}, {"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()} {"engine", std::make_shared<DataTypeString>()}
} }
)
{ {
} }
void StorageSystemParts::processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column)
BlockInputStreams StorageSystemParts::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{ {
bool has_state_column = false; using State = MergeTreeDataPart::State;
Names real_column_names;
for (const String & column_name : column_names) for (size_t part_number = 0; part_number < info.all_parts.size(); ++part_number)
{ {
if (column_name == "_state") const auto & part = info.all_parts[part_number];
has_state_column = true; auto part_state = info.all_parts_state[part_number];
else
real_column_names.emplace_back(column_name); size_t i = 0;
{
WriteBufferFromOwnString out;
part->partition.serializeTextQuoted(*info.data, out);
columns[i++]->insert(out.str());
}
columns[i++]->insert(part->name);
columns[i++]->insert(static_cast<UInt64>(part_state == State::Committed));
columns[i++]->insert(static_cast<UInt64>(part->marks_count));
columns[i++]->insert(static_cast<UInt64>(part->getTotalMrkSizeInBytes()));
columns[i++]->insert(static_cast<UInt64>(part->rows_count));
columns[i++]->insert(static_cast<UInt64>(part->size_in_bytes));
columns[i++]->insert(static_cast<UInt64>(part->modification_time));
columns[i++]->insert(static_cast<UInt64>(part->remove_time));
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
columns[i++]->insert(static_cast<UInt64>(part.use_count() - 1));
columns[i++]->insert(static_cast<UInt64>(part->getMinDate()));
columns[i++]->insert(static_cast<UInt64>(part->getMaxDate()));
columns[i++]->insert(part->info.min_block);
columns[i++]->insert(part->info.max_block);
columns[i++]->insert(static_cast<UInt64>(part->info.level));
columns[i++]->insert(static_cast<UInt64>(part->getIndexSizeInBytes()));
columns[i++]->insert(static_cast<UInt64>(part->getIndexSizeInAllocatedBytes()));
columns[i++]->insert(info.database);
columns[i++]->insert(info.table);
columns[i++]->insert(info.engine);
if (has_state_column)
columns[i++]->insert(part->stateString());
} }
/// Do not check if only _state column is requested
if (!(has_state_column && real_column_names.empty()))
check(real_column_names);
processed_stage = QueryProcessingStage::FetchColumns;
/// Will apply WHERE to subset of columns and then add more columns.
/// This is kind of complicated, but we use WHERE to do less work.
Block block_to_filter;
std::map<std::pair<String, String>, StoragePtr> storages;
{
Databases databases = context.getDatabases();
/// Add column 'database'.
MutableColumnPtr database_column_mut = ColumnString::create();
for (const auto & database : databases)
database_column_mut->insert(database.first);
block_to_filter.insert(ColumnWithTypeAndName(std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
/// Filter block_to_filter with column 'database'.
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
if (!block_to_filter.rows())
return BlockInputStreams();
/// Add columns 'table', 'engine', 'active'
ColumnPtr database_column = block_to_filter.getByName("database").column;
size_t rows = database_column->size();
IColumn::Offsets offsets(rows);
MutableColumnPtr table_column_mut = ColumnString::create();
MutableColumnPtr engine_column_mut = ColumnString::create();
MutableColumnPtr active_column_mut = ColumnUInt8::create();
for (size_t i = 0; i < rows; ++i)
{
String database_name = (*database_column)[i].get<String>();
const DatabasePtr database = databases.at(database_name);
offsets[i] = i ? offsets[i - 1] : 0;
for (auto iterator = database->getIterator(context); iterator->isValid(); iterator->next())
{
String table_name = iterator->name();
StoragePtr storage = iterator->table();
String engine_name = storage->getName();
if (!dynamic_cast<StorageMergeTree *>(&*storage) &&
!dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
continue;
storages[std::make_pair(database_name, iterator->name())] = storage;
/// Add all combinations of flag 'active'.
for (UInt64 active : {0, 1})
{
table_column_mut->insert(table_name);
engine_column_mut->insert(engine_name);
active_column_mut->insert(active);
}
offsets[i] += 2;
}
}
for (size_t i = 0; i < block_to_filter.columns(); ++i)
{
ColumnPtr & column = block_to_filter.safeGetByPosition(i).column;
column = column->replicate(offsets);
}
block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), "engine"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), "active"));
}
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
/// If all was filtered out.
if (!block_to_filter.rows())
return {};
ColumnPtr filtered_database_column = block_to_filter.getByName("database").column;
ColumnPtr filtered_table_column = block_to_filter.getByName("table").column;
ColumnPtr filtered_active_column = block_to_filter.getByName("active").column;
/// Finally, create the result.
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
if (has_state_column)
res_columns.push_back(ColumnString::create());
for (size_t i = 0; i < filtered_database_column->size();)
{
String database = (*filtered_database_column)[i].get<String>();
String table = (*filtered_table_column)[i].get<String>();
/// What 'active' value we need.
bool need[2]{}; /// [active]
for (; i < filtered_database_column->size() &&
(*filtered_database_column)[i].get<String>() == database &&
(*filtered_table_column)[i].get<String>() == table; ++i)
{
bool active = !!(*filtered_active_column)[i].get<UInt64>();
need[active] = true;
}
StoragePtr storage = storages.at(std::make_pair(database, table));
TableStructureReadLockPtr table_lock;
try
{
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); /// For table not to be dropped.
}
catch (const Exception & e)
{
/** There are case when IStorage::drop was called,
* but we still own the object.
* Then table will throw exception at attempt to lock it.
* Just skip the table.
*/
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
continue;
throw;
}
String engine = storage->getName();
MergeTreeData * data = nullptr;
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(&*storage))
{
data = &merge_tree->getData();
}
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
{
data = &replicated_merge_tree->getData();
}
else
{
throw Exception("Unknown engine " + engine, ErrorCodes::LOGICAL_ERROR);
}
using State = MergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
if (need[0])
{
/// If has_state_column is requested, return all states
if (!has_state_column)
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state);
else
all_parts = data->getAllDataPartsVector(&all_parts_state);
}
else
all_parts = data->getDataPartsVector({State::Committed}, &all_parts_state);
/// Finally, we'll go through the list of parts.
for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
{
const auto & part = all_parts[part_number];
auto part_state = all_parts_state[part_number];
size_t i = 0;
{
WriteBufferFromOwnString out;
part->partition.serializeTextQuoted(*data, out);
res_columns[i++]->insert(out.str());
}
res_columns[i++]->insert(part->name);
res_columns[i++]->insert(static_cast<UInt64>(part_state == State::Committed));
res_columns[i++]->insert(static_cast<UInt64>(part->marks_count));
size_t marks_size = 0;
for (const NameAndTypePair & it : part->columns)
{
String name = escapeForFileName(it.name);
auto checksum = part->checksums.files.find(name + ".mrk");
if (checksum != part->checksums.files.end())
marks_size += checksum->second.file_size;
}
res_columns[i++]->insert(static_cast<UInt64>(marks_size));
res_columns[i++]->insert(static_cast<UInt64>(part->rows_count));
res_columns[i++]->insert(static_cast<UInt64>(part->size_in_bytes));
res_columns[i++]->insert(static_cast<UInt64>(part->modification_time));
res_columns[i++]->insert(static_cast<UInt64>(part->remove_time));
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
res_columns[i++]->insert(static_cast<UInt64>(part.use_count() - 1));
res_columns[i++]->insert(static_cast<UInt64>(part->getMinDate()));
res_columns[i++]->insert(static_cast<UInt64>(part->getMaxDate()));
res_columns[i++]->insert(part->info.min_block);
res_columns[i++]->insert(part->info.max_block);
res_columns[i++]->insert(static_cast<UInt64>(part->info.level));
res_columns[i++]->insert(static_cast<UInt64>(part->getIndexSizeInBytes()));
res_columns[i++]->insert(static_cast<UInt64>(part->getIndexSizeInAllocatedBytes()));
res_columns[i++]->insert(database);
res_columns[i++]->insert(table);
res_columns[i++]->insert(engine);
if (has_state_column)
res_columns[i++]->insert(part->stateString());
}
}
Block block = getSampleBlock();
if (has_state_column)
block.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block.cloneWithColumns(std::move(res_columns))));
} }
NameAndTypePair StorageSystemParts::getColumn(const String & column_name) const
{
if (column_name == "_state")
return NameAndTypePair("_state", std::make_shared<DataTypeString>());
return ITableDeclaration::getColumn(column_name);
}
bool StorageSystemParts::hasColumn(const String & column_name) const
{
if (column_name == "_state")
return true;
return ITableDeclaration::hasColumn(column_name);
}
} }

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <ext/shared_ptr_helper.h> #include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h> #include <Storages/System/StorageSystemPartsBase.h>
namespace DB namespace DB
@ -12,32 +12,14 @@ class Context;
/** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family. /** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family.
*/ */
class StorageSystemParts : public ext::shared_ptr_helper<StorageSystemParts>, public IStorage class StorageSystemParts : public ext::shared_ptr_helper<StorageSystemParts>, public StorageSystemPartsBase
{ {
public: public:
std::string getName() const override { return "SystemParts"; } std::string getName() const override { return "SystemParts"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
NamesAndTypesList columns;
protected: protected:
StorageSystemParts(const std::string & name_); explicit StorageSystemParts(const std::string & name);
void processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column) override;
}; };
} }

View File

@ -0,0 +1,280 @@
#include <Storages/System/StorageSystemPartsBase.h>
#include <Common/escapeForFileName.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
namespace DB
{
bool StorageSystemPartsBase::hasStateColumn(const Names & column_names)
{
bool has_state_column = false;
Names real_column_names;
for (const String & column_name : column_names)
{
if (column_name == "_state")
has_state_column = true;
else
real_column_names.emplace_back(column_name);
}
/// Do not check if only _state column is requested
if (!(has_state_column && real_column_names.empty()))
check(real_column_names);
return has_state_column;
}
class StoragesInfoStream
{
public:
StoragesInfoStream(const SelectQueryInfo & query_info, const Context & context, bool has_state_column)
: has_state_column(has_state_column)
{
/// Will apply WHERE to subset of columns and then add more columns.
/// This is kind of complicated, but we use WHERE to do less work.
Block block_to_filter;
MutableColumnPtr table_column_mut = ColumnString::create();
MutableColumnPtr engine_column_mut = ColumnString::create();
MutableColumnPtr active_column_mut = ColumnUInt8::create();
{
Databases databases = context.getDatabases();
/// Add column 'database'.
MutableColumnPtr database_column_mut = ColumnString::create();
for (const auto & database : databases)
database_column_mut->insert(database.first);
block_to_filter.insert(ColumnWithTypeAndName(
std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
/// Filter block_to_filter with column 'database'.
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
rows = block_to_filter.rows();
/// Block contains new columns, update database_column.
ColumnPtr database_column = block_to_filter.getByName("database").column;
if (rows)
{
/// Add columns 'table', 'engine', 'active'
IColumn::Offsets offsets(rows);
for (size_t i = 0; i < rows; ++i)
{
String database_name = (*database_column)[i].get<String>();
const DatabasePtr database = databases.at(database_name);
offsets[i] = i ? offsets[i - 1] : 0;
for (auto iterator = database->getIterator(context); iterator->isValid(); iterator->next())
{
String table_name = iterator->name();
StoragePtr storage = iterator->table();
String engine_name = storage->getName();
if (!dynamic_cast<StorageMergeTree *>(&*storage) &&
!dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
continue;
storages[std::make_pair(database_name, iterator->name())] = storage;
/// Add all combinations of flag 'active'.
for (UInt64 active : {0, 1})
{
table_column_mut->insert(table_name);
engine_column_mut->insert(engine_name);
active_column_mut->insert(active);
}
offsets[i] += 2;
}
}
for (size_t i = 0; i < block_to_filter.columns(); ++i)
{
ColumnPtr & column = block_to_filter.safeGetByPosition(i).column;
column = column->replicate(offsets);
}
}
}
if (rows)
{
block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), "engine"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), "active"));
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
rows = block_to_filter.rows();
}
database_column = block_to_filter.getByName("database").column;
table_column = block_to_filter.getByName("table").column;
active_column = block_to_filter.getByName("active").column;
next_row = 0;
}
StorageSystemPartsBase::StoragesInfo next()
{
StorageSystemPartsBase::StoragesInfo info;
info.storage = nullptr;
while (next_row < rows)
{
info.database = (*database_column)[next_row].get<String>();
info.table = (*table_column)[next_row].get<String>();
auto isSameTable = [& info, this] (size_t next_row) -> bool
{
return (*database_column)[next_row].get<String>() == info.database &&
(*table_column)[next_row].get<String>() == info.table;
};
/// What 'active' value we need.
bool need[2]{}; /// [active]
for (; next_row < rows && isSameTable(next_row); ++next_row)
{
bool active = (*active_column)[next_row].get<UInt64>() != 0;
need[active] = true;
}
info.storage = storages.at(std::make_pair(info.database, info.table));
try
{
/// For table not to be dropped.
info.table_lock = info.storage->lockStructure(false, __PRETTY_FUNCTION__);
}
catch (const Exception & e)
{
/** There are case when IStorage::drop was called,
* but we still own the object.
* Then table will throw exception at attempt to lock it.
* Just skip the table.
*/
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
continue;
throw;
}
info.engine = info.storage->getName();
info.data = nullptr;
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(&*info.storage))
{
info.data = &merge_tree->getData();
}
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*info.storage))
{
info.data = &replicated_merge_tree->getData();
}
else
{
throw Exception("Unknown engine " + info.engine, ErrorCodes::LOGICAL_ERROR);
}
using State = MergeTreeDataPart::State;
auto & all_parts_state = info.all_parts_state;
auto & all_parts = info.all_parts;
if (need[0])
{
/// If has_state_column is requested, return all states.
if (!has_state_column)
all_parts = info.data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state);
else
all_parts = info.data->getAllDataPartsVector(&all_parts_state);
}
else
all_parts = info.data->getDataPartsVector({State::Committed}, &all_parts_state);
break;
}
return info;
}
private:
bool has_state_column;
ColumnPtr database_column;
ColumnPtr table_column;
ColumnPtr active_column;
size_t next_row;
size_t rows;
using StoragesMap = std::map<std::pair<String, String>, StoragePtr>;
StoragesMap storages;
};
BlockInputStreams StorageSystemPartsBase::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
bool has_state_column = hasStateColumn(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
StoragesInfoStream stream(query_info, context, has_state_column);
/// Create the result.
MutableColumns columns = getSampleBlock().cloneEmptyColumns();
if (has_state_column)
columns.push_back(ColumnString::create());
while (StoragesInfo info = stream.next())
{
processNextStorage(columns, info, has_state_column);
}
Block block = getSampleBlock();
if (has_state_column)
block.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block.cloneWithColumns(std::move(columns))));
}
NameAndTypePair StorageSystemPartsBase::getColumn(const String & column_name) const
{
if (column_name == "_state")
return NameAndTypePair("_state", std::make_shared<DataTypeString>());
return ITableDeclaration::getColumn(column_name);
}
bool StorageSystemPartsBase::hasColumn(const String & column_name) const
{
if (column_name == "_state")
return true;
return ITableDeclaration::hasColumn(column_name);
}
}

View File

@ -0,0 +1,63 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
class Context;
/** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family.
*/
class StorageSystemPartsBase : public IStorage
{
public:
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
struct StoragesInfo
{
StoragePtr storage;
TableStructureReadLockPtr table_lock;
String database;
String table;
String engine;
MergeTreeData * data;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
operator bool() const { return storage != nullptr; }
};
private:
const std::string name;
NamesAndTypesList columns;
bool hasStateColumn(const Names & column_names);
protected:
StorageSystemPartsBase(std::string name_, NamesAndTypesList && columns) : name(std::move(name_)), columns(columns) {}
virtual void processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column) = 0;
};
}

View File

@ -0,0 +1,154 @@
#include <Common/escapeForFileName.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemPartsColumns.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
namespace DB
{
StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name)
: StorageSystemPartsBase(name,
{
{"partition", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"active", std::make_shared<DataTypeUInt8>()},
{"marks", std::make_shared<DataTypeUInt64>()},
{"marks_bytes_in_block", std::make_shared<DataTypeUInt64>()},
{"rows", std::make_shared<DataTypeUInt64>()},
{"bytes", std::make_shared<DataTypeUInt64>()},
{"modification_time", std::make_shared<DataTypeDateTime>()},
{"remove_time", std::make_shared<DataTypeDateTime>()},
{"refcount", std::make_shared<DataTypeUInt32>()},
{"min_date", std::make_shared<DataTypeDate>()},
{"max_date", std::make_shared<DataTypeDate>()},
{"min_block_number", std::make_shared<DataTypeInt64>()},
{"max_block_number", std::make_shared<DataTypeInt64>()},
{"level", std::make_shared<DataTypeUInt32>()},
{"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()},
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"column", std::make_shared<DataTypeString>()},
{ "type", std::make_shared<DataTypeString>() },
{ "default_kind", std::make_shared<DataTypeString>() },
{ "default_expression", std::make_shared<DataTypeString>() },
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "marks_bytes_in_column", std::make_shared<DataTypeUInt64>() },
}
)
{
}
void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column)
{
/// Prepare information about columns in storage.
struct ColumnInfo
{
String default_kind;
String default_expression;
};
NamesAndTypesList columns_list = info.storage->getColumnsList();
columns_list.insert(std::end(columns_list), std::begin(info.storage->alias_columns), std::end(info.storage->alias_columns));
column_defaults = info.storage->column_defaults;
std::unordered_map<String, ColumnInfo> columns_info;
for (const auto & column : columns_list)
{
ColumnInfo column_info;
const auto it = column_defaults.find(column.name);
if (it != std::end(column_defaults))
{
column_info.default_kind = toString(it->second.type);
column_info.default_expression = queryToString(it->second.expression);
}
columns_info[column.name] = column_info;
}
/// Go through the list of parts.
for (size_t part_number = 0; part_number < info.all_parts.size(); ++part_number)
{
const auto & part = info.all_parts[part_number];
auto part_state = info.all_parts_state[part_number];
auto total_mrk_size_in_bytes = part->getTotalMrkSizeInBytes();
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
auto use_count = part.use_count() - 1;
auto min_date = part->getMinDate();
auto max_date = part->getMaxDate();
auto index_size_in_bytes = part->getIndexSizeInBytes();
auto index_size_in_allocated_bytes = part->getIndexSizeInAllocatedBytes();
using State = MergeTreeDataPart::State;
for (const auto & column : part->columns)
{
size_t j = 0;
{
WriteBufferFromOwnString out;
part->partition.serializeTextQuoted(*info.data, out);
columns[j++]->insert(out.str());
}
columns[j++]->insert(part->name);
columns[j++]->insert(static_cast<UInt64>(part_state == State::Committed));
columns[j++]->insert(static_cast<UInt64>(part->marks_count));
columns[j++]->insert(static_cast<UInt64>(total_mrk_size_in_bytes));
columns[j++]->insert(static_cast<UInt64>(part->rows_count));
columns[j++]->insert(static_cast<UInt64>(part->size_in_bytes));
columns[j++]->insert(static_cast<UInt64>(part->modification_time));
columns[j++]->insert(static_cast<UInt64>(part->remove_time));
columns[j++]->insert(static_cast<UInt64>(use_count));
columns[j++]->insert(static_cast<UInt64>(min_date));
columns[j++]->insert(static_cast<UInt64>(max_date));
columns[j++]->insert(part->info.min_block);
columns[j++]->insert(part->info.max_block);
columns[j++]->insert(static_cast<UInt64>(part->info.level));
columns[j++]->insert(static_cast<UInt64>(index_size_in_bytes));
columns[j++]->insert(static_cast<UInt64>(index_size_in_allocated_bytes));
columns[j++]->insert(info.database);
columns[j++]->insert(info.table);
columns[j++]->insert(info.engine);
columns[j++]->insert(column.name);
columns[j++]->insert(column.type->getName());
auto column_info_it = columns_info.find(column.name);
if (column_info_it != columns_info.end())
{
columns[j++]->insert(column_info_it->second.default_kind);
columns[j++]->insert(column_info_it->second.default_expression);
}
else
{
columns[j++]->insertDefault();
columns[j++]->insertDefault();
}
columns[j++]->insert(part->getColumnCompressedSize(column.name));
columns[j++]->insert(part->getColumnUncompressedSize(column.name));
columns[j++]->insert(part->getColumnMrkSize(column.name));
if (has_state_column)
columns[j++]->insert(part->stateString());
}
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/StorageSystemPartsBase.h>
namespace DB
{
class Context;
/** Implements system table 'parts_columns' which allows to get information about
* columns in data parts for tables of MergeTree family.
*/
class StorageSystemPartsColumns
: public ext::shared_ptr_helper<StorageSystemPartsColumns>, public StorageSystemPartsBase
{
public:
std::string getName() const override { return "SystemPartsColumns"; }
protected:
StorageSystemPartsColumns(const std::string & name_);
void processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column) override;
};
}

View File

@ -16,6 +16,7 @@
#include <Storages/System/StorageSystemNumbers.h> #include <Storages/System/StorageSystemNumbers.h>
#include <Storages/System/StorageSystemOne.h> #include <Storages/System/StorageSystemOne.h>
#include <Storages/System/StorageSystemParts.h> #include <Storages/System/StorageSystemParts.h>
#include <Storages/System/StorageSystemPartsColumns.h>
#include <Storages/System/StorageSystemProcesses.h> #include <Storages/System/StorageSystemProcesses.h>
#include <Storages/System/StorageSystemReplicas.h> #include <Storages/System/StorageSystemReplicas.h>
#include <Storages/System/StorageSystemReplicationQueue.h> #include <Storages/System/StorageSystemReplicationQueue.h>
@ -45,6 +46,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
{ {
attachSystemTablesLocal(system_database); attachSystemTablesLocal(system_database);
system_database.attachTable("parts", StorageSystemParts::create("parts")); system_database.attachTable("parts", StorageSystemParts::create("parts"));
system_database.attachTable("parts_columns", StorageSystemPartsColumns::create("parts_columns"));
system_database.attachTable("processes", StorageSystemProcesses::create("processes")); system_database.attachTable("processes", StorageSystemProcesses::create("processes"));
system_database.attachTable("metrics", StorageSystemMetrics::create("metrics")); system_database.attachTable("metrics", StorageSystemMetrics::create("metrics"));
system_database.attachTable("merges", StorageSystemMerges::create("merges")); system_database.attachTable("merges", StorageSystemMerges::create("merges"));

View File

@ -7,19 +7,6 @@ else ()
include (${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake) include (${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake)
endif () endif ()
# Google Test from sources
add_subdirectory(${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
# avoid problems with <regexp.h>
target_compile_definitions (gtest INTERFACE GTEST_HAS_POSIX_RE=0)
target_include_directories (gtest INTERFACE ${ClickHouse_SOURCE_DIR}/contrib/googletest/include)
macro(grep_gtest_sources BASE_DIR DST_VAR)
# Cold match files that are not in tests/ directories
file(GLOB_RECURSE "${DST_VAR}" RELATIVE "${BASE_DIR}" "gtest*.cpp")
endmacro()
install (PROGRAMS clickhouse-test DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) install (PROGRAMS clickhouse-test DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
install ( install (
DIRECTORY queries performance external_dictionaries DIRECTORY queries performance external_dictionaries

View File

@ -0,0 +1,119 @@
<test>
<name>test_hits_agg_functions_max_min_any</name>
<type>loop</type>
<stop_conditions>
<all_of>
<iterations>3</iterations>
<min_time_not_changing_for_ms>10000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<iterations>5</iterations>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<min_time/>
</main_metric>
<preconditions>
<table_exists>default.hits_1000m</table_exists>
</preconditions>
<query>select min(Title) from hits_1000m where Title != '' group by intHash32(UserID) % 1000000</query>
<query>select max(Title) from hits_1000m where Title != '' group by intHash32(UserID) % 1000000</query>
<query>select any(Title) from hits_1000m where Title != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(Title) from hits_1000m where Title != '' group by intHash32(UserID) % 1000000</query>
<query>select min(URL) from hits_1000m where URL != '' group by intHash32(UserID) % 1000000</query>
<query>select max(URL) from hits_1000m where URL != '' group by intHash32(UserID) % 1000000</query>
<query>select any(URL) from hits_1000m where URL != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(URL) from hits_1000m where URL != '' group by intHash32(UserID) % 1000000</query>
<query>select min(Referer) from hits_1000m where Referer != '' group by intHash32(UserID) % 1000000</query>
<query>select max(Referer) from hits_1000m where Referer != '' group by intHash32(UserID) % 1000000</query>
<query>select any(Referer) from hits_1000m where Referer != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(Referer) from hits_1000m where Referer != '' group by intHash32(UserID) % 1000000</query>
<query>select min(FlashMinor2) from hits_1000m where FlashMinor2 != '' group by intHash32(UserID) % 1000000</query>
<query>select max(FlashMinor2) from hits_1000m where FlashMinor2 != '' group by intHash32(UserID) % 1000000</query>
<query>select any(FlashMinor2) from hits_1000m where FlashMinor2 != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(FlashMinor2) from hits_1000m where FlashMinor2 != '' group by intHash32(UserID) % 1000000</query>
<query>select min(MobilePhoneModel) from hits_1000m where MobilePhoneModel != '' group by intHash32(UserID) % 1000000</query>
<query>select max(MobilePhoneModel) from hits_1000m where MobilePhoneModel != '' group by intHash32(UserID) % 1000000</query>
<query>select any(MobilePhoneModel) from hits_1000m where MobilePhoneModel != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(MobilePhoneModel) from hits_1000m where MobilePhoneModel != '' group by intHash32(UserID) % 1000000</query>
<query>select min(Params) from hits_1000m where Params != '' group by intHash32(UserID) % 1000000</query>
<query>select max(Params) from hits_1000m where Params != '' group by intHash32(UserID) % 1000000</query>
<query>select any(Params) from hits_1000m where Params != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(Params) from hits_1000m where Params != '' group by intHash32(UserID) % 1000000</query>
<query>select min(SearchPhrase) from hits_1000m where SearchPhrase != '' group by intHash32(UserID) % 1000000</query>
<query>select max(SearchPhrase) from hits_1000m where SearchPhrase != '' group by intHash32(UserID) % 1000000</query>
<query>select any(SearchPhrase) from hits_1000m where SearchPhrase != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(SearchPhrase) from hits_1000m where SearchPhrase != '' group by intHash32(UserID) % 1000000</query>
<query>select min(PageCharset) from hits_1000m where PageCharset != '' group by intHash32(UserID) % 1000000</query>
<query>select max(PageCharset) from hits_1000m where PageCharset != '' group by intHash32(UserID) % 1000000</query>
<query>select any(PageCharset) from hits_1000m where PageCharset != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(PageCharset) from hits_1000m where PageCharset != '' group by intHash32(UserID) % 1000000</query>
<query>select min(OriginalURL) from hits_1000m where OriginalURL != '' group by intHash32(UserID) % 1000000</query>
<query>select max(OriginalURL) from hits_1000m where OriginalURL != '' group by intHash32(UserID) % 1000000</query>
<query>select any(OriginalURL) from hits_1000m where OriginalURL != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(OriginalURL) from hits_1000m where OriginalURL != '' group by intHash32(UserID) % 1000000</query>
<query>select min(SocialNetwork) from hits_1000m where SocialNetwork != '' group by intHash32(UserID) % 1000000</query>
<query>select max(SocialNetwork) from hits_1000m where SocialNetwork != '' group by intHash32(UserID) % 1000000</query>
<query>select any(SocialNetwork) from hits_1000m where SocialNetwork != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(SocialNetwork) from hits_1000m where SocialNetwork != '' group by intHash32(UserID) % 1000000</query>
<query>select min(SocialAction) from hits_1000m where SocialAction != '' group by intHash32(UserID) % 1000000</query>
<query>select max(SocialAction) from hits_1000m where SocialAction != '' group by intHash32(UserID) % 1000000</query>
<query>select any(SocialAction) from hits_1000m where SocialAction != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(SocialAction) from hits_1000m where SocialAction != '' group by intHash32(UserID) % 1000000</query>
<query>select min(SocialSourcePage) from hits_1000m where SocialSourcePage != '' group by intHash32(UserID) % 1000000</query>
<query>select max(SocialSourcePage) from hits_1000m where SocialSourcePage != '' group by intHash32(UserID) % 1000000</query>
<query>select any(SocialSourcePage) from hits_1000m where SocialSourcePage != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(SocialSourcePage) from hits_1000m where SocialSourcePage != '' group by intHash32(UserID) % 1000000</query>
<query>select min(ParamOrderID) from hits_1000m where ParamOrderID != '' group by intHash32(UserID) % 1000000</query>
<query>select max(ParamOrderID) from hits_1000m where ParamOrderID != '' group by intHash32(UserID) % 1000000</query>
<query>select any(ParamOrderID) from hits_1000m where ParamOrderID != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(ParamOrderID) from hits_1000m where ParamOrderID != '' group by intHash32(UserID) % 1000000</query>
<query>select min(OpenstatServiceName) from hits_1000m where OpenstatServiceName != '' group by intHash32(UserID) % 1000000</query>
<query>select max(OpenstatServiceName) from hits_1000m where OpenstatServiceName != '' group by intHash32(UserID) % 1000000</query>
<query>select any(OpenstatServiceName) from hits_1000m where OpenstatServiceName != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(OpenstatServiceName) from hits_1000m where OpenstatServiceName != '' group by intHash32(UserID) % 1000000</query>
<query>select min(OpenstatCampaignID) from hits_1000m where OpenstatCampaignID != '' group by intHash32(UserID) % 1000000</query>
<query>select max(OpenstatCampaignID) from hits_1000m where OpenstatCampaignID != '' group by intHash32(UserID) % 1000000</query>
<query>select any(OpenstatCampaignID) from hits_1000m where OpenstatCampaignID != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(OpenstatCampaignID) from hits_1000m where OpenstatCampaignID != '' group by intHash32(UserID) % 1000000</query>
<query>select min(OpenstatAdID) from hits_1000m where OpenstatAdID != '' group by intHash32(UserID) % 1000000</query>
<query>select max(OpenstatAdID) from hits_1000m where OpenstatAdID != '' group by intHash32(UserID) % 1000000</query>
<query>select any(OpenstatAdID) from hits_1000m where OpenstatAdID != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(OpenstatAdID) from hits_1000m where OpenstatAdID != '' group by intHash32(UserID) % 1000000</query>
<query>select min(OpenstatSourceID) from hits_1000m where OpenstatSourceID != '' group by intHash32(UserID) % 1000000</query>
<query>select max(OpenstatSourceID) from hits_1000m where OpenstatSourceID != '' group by intHash32(UserID) % 1000000</query>
<query>select any(OpenstatSourceID) from hits_1000m where OpenstatSourceID != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(OpenstatSourceID) from hits_1000m where OpenstatSourceID != '' group by intHash32(UserID) % 1000000</query>
<query>select min(UTMSource) from hits_1000m where UTMSource != '' group by intHash32(UserID) % 1000000</query>
<query>select max(UTMSource) from hits_1000m where UTMSource != '' group by intHash32(UserID) % 1000000</query>
<query>select any(UTMSource) from hits_1000m where UTMSource != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(UTMSource) from hits_1000m where UTMSource != '' group by intHash32(UserID) % 1000000</query>
<query>select min(UTMMedium) from hits_1000m where UTMMedium != '' group by intHash32(UserID) % 1000000</query>
<query>select max(UTMMedium) from hits_1000m where UTMMedium != '' group by intHash32(UserID) % 1000000</query>
<query>select any(UTMMedium) from hits_1000m where UTMMedium != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(UTMMedium) from hits_1000m where UTMMedium != '' group by intHash32(UserID) % 1000000</query>
<query>select min(UTMCampaign) from hits_1000m where UTMCampaign != '' group by intHash32(UserID) % 1000000</query>
<query>select max(UTMCampaign) from hits_1000m where UTMCampaign != '' group by intHash32(UserID) % 1000000</query>
<query>select any(UTMCampaign) from hits_1000m where UTMCampaign != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(UTMCampaign) from hits_1000m where UTMCampaign != '' group by intHash32(UserID) % 1000000</query>
<query>select min(UTMContent) from hits_1000m where UTMContent != '' group by intHash32(UserID) % 1000000</query>
<query>select max(UTMContent) from hits_1000m where UTMContent != '' group by intHash32(UserID) % 1000000</query>
<query>select any(UTMContent) from hits_1000m where UTMContent != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(UTMContent) from hits_1000m where UTMContent != '' group by intHash32(UserID) % 1000000</query>
<query>select min(UTMTerm) from hits_1000m where UTMTerm != '' group by intHash32(UserID) % 1000000</query>
<query>select max(UTMTerm) from hits_1000m where UTMTerm != '' group by intHash32(UserID) % 1000000</query>
<query>select any(UTMTerm) from hits_1000m where UTMTerm != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(UTMTerm) from hits_1000m where UTMTerm != '' group by intHash32(UserID) % 1000000</query>
<query>select min(FromTag) from hits_1000m where FromTag != '' group by intHash32(UserID) % 1000000</query>
<query>select max(FromTag) from hits_1000m where FromTag != '' group by intHash32(UserID) % 1000000</query>
<query>select any(FromTag) from hits_1000m where FromTag != '' group by intHash32(UserID) % 1000000</query>
<query>select anyHeavy(FromTag) from hits_1000m where FromTag != '' group by intHash32(UserID) % 1000000</query>
</test>

View File

@ -0,0 +1,11 @@
0
1
2
3
4
5
6
7
8
9
10

View File

@ -0,0 +1,2 @@
SET max_block_size = 10;
SELECT * FROM (select toUInt64(1) s limit 1) any right join (select number s from numbers(11)) using (s) ORDER BY s;

View File

@ -0,0 +1,12 @@
1
1
1
1
1
1
1
1
1
1 2
1 2
1 2

View File

@ -0,0 +1,6 @@
SELECT a FROM (SELECT 1 AS a, 2 AS b);
SELECT a FROM (SELECT 1 AS a, arrayJoin([2, 3]) AS b);
SELECT a FROM (SELECT 1 AS a, arrayJoin([2, 3]), arrayJoin([2, 3]));
SELECT a FROM (SELECT 1 AS a, arrayJoin([2, 3]), arrayJoin([4, 5]));
SELECT a, b FROM (SELECT a, * FROM (SELECT 1 AS a, 2 AS b, 3 AS c));
SELECT a, b FROM (SELECT a, *, arrayJoin(c) FROM (SELECT 1 AS a, 2 AS b, [3, 4] AS c));

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (1.1.54330) unstable; urgency=low clickhouse (1.1.54331) unstable; urgency=low
* Modified source code * Modified source code
-- <robot-metrika-test@yandex-team.ru> Thu, 28 Dec 2017 18:27:14 +0300 -- <robot-metrika-test@yandex-team.ru> Tue, 09 Jan 2018 10:46:57 +0300

View File

@ -1,4 +1,6 @@
/usr/bin/clickhouse-client /usr/bin/clickhouse-client
/usr/bin/clickhouse-local /usr/bin/clickhouse-local
/usr/bin/clickhouse-compressor
/usr/bin/clickhouse-benchmark
/etc/clickhouse-client/config.xml /etc/clickhouse-client/config.xml
/usr/bin/clickhouse-extract-from-config /usr/bin/clickhouse-extract-from-config

View File

@ -1 +0,0 @@
/usr/bin/clickhouse-compressor

View File

@ -1,4 +1,3 @@
usr/bin/clickhouse-test usr/bin/clickhouse-test
usr/share/clickhouse-test/* usr/share/clickhouse-test/*
usr/bin/clickhouse-performance-test usr/bin/clickhouse-performance-test
usr/bin/clickhouse-benchmark

17
debian/control vendored
View File

@ -1,4 +1,5 @@
Source: clickhouse Source: clickhouse
Section: database
Priority: optional Priority: optional
Maintainer: Alexey Milovidov <milovidov@yandex-team.ru> Maintainer: Alexey Milovidov <milovidov@yandex-team.ru>
Build-Depends: debhelper (>= 9), Build-Depends: debhelper (>= 9),
@ -11,16 +12,8 @@ Build-Depends: debhelper (>= 9),
libssl-dev, libssl-dev,
unixodbc-dev unixodbc-dev
Standards-Version: 3.8.0 Standards-Version: 3.8.0
Section: libs
Package: clickhouse-compressor
Section: libdevel
Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}
Description: clickhouse-compressor
Package: clickhouse-client Package: clickhouse-client
Section: libdevel
Architecture: any Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-server-base (= ${binary:Version}) Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-server-base (= ${binary:Version})
Description: Client binary for clickhouse Description: Client binary for clickhouse
@ -30,7 +23,6 @@ Description: Client binary for clickhouse
This package provides clickhouse-client , clickhouse-local and clickhouse-benchmark This package provides clickhouse-client , clickhouse-local and clickhouse-benchmark
Package: clickhouse-server-base Package: clickhouse-server-base
Section: libdevel
Architecture: any Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, tzdata Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, tzdata
Description: Server binary for clickhouse Description: Server binary for clickhouse
@ -40,7 +32,6 @@ Description: Server binary for clickhouse
This package provides clickhouse common configuration files This package provides clickhouse common configuration files
Package: clickhouse-server-common Package: clickhouse-server-common
Section: libdevel
Architecture: any Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-server-base (= ${binary:Version}) Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-server-base (= ${binary:Version})
Description: clickhouse-server-common Description: clickhouse-server-common
@ -54,10 +45,8 @@ Depends: ${misc:Depends}, clickhouse-server-base (= ${binary:Version})
Description: debugging symbols for clickhouse-server-base Description: debugging symbols for clickhouse-server-base
This package contains the debugging symbols for clickhouse-server-base. This package contains the debugging symbols for clickhouse-server-base.
Package: clickhouse-test Package: clickhouse-test
Section: Database #Priority: extra
Priority: extra
Architecture: any Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-client, clickhouse-compressor, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-client, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo
Description: Clickhouse tests Description: Clickhouse tests

24
debian/pbuilder-hooks/B90test-server vendored Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -e
set -x
for PKG in $(ls /tmp/buildd/*.deb | sed -e's,.*/,,;s,_.*,,' ); do
apt-get install -y --force-yes "$PKG" || true
apt-get remove -y "$PKG" || true
done
dpkg -i /tmp/buildd/*.deb || true
apt install -y -f --allow-downgrades
service clickhouse-server start
sleep 3
clickhouse-client -q "SELECT * from system.build_options;"
# --no-shard because default server listen only :: and 127.0.0.1
[ -n "$TEST_RUN" ] && clickhouse-test --no-shard --queries /usr/share/clickhouse-test/queries --tmp /tmp/clickhouse-test/ || true
service clickhouse-server stop

View File

@ -1 +0,0 @@
/usr/share/doc/pbuilder/examples/B92test-pkg

View File

@ -1,4 +0,0 @@
#!/bin/bash
service clickhouse-server start
sleep 3

View File

@ -1,6 +0,0 @@
#!/bin/bash
clickhouse-client -q "SELECT * from system.build_options;"
# --no-shard because default server listen only :: and 127.0.0.1
[ -n "$TEST_RUN" ] && clickhouse-test --no-shard --queries /usr/share/clickhouse-test/queries --tmp /tmp/clickhouse-test/ || true

View File

@ -1,3 +0,0 @@
#!/bin/bash
service clickhouse-server stop

View File

@ -1,9 +1,10 @@
find_package (Threads) find_package (Threads)
add_executable (clickhouse-compressor main.cpp) add_executable (util-clickhouse-compressor main.cpp)
target_link_libraries (clickhouse-compressor clickhouse-compressor-lib) target_link_libraries (util-clickhouse-compressor clickhouse-compressor-lib)
set_target_properties(util-clickhouse-compressor PROPERTIES OUTPUT_NAME "clickhouse-compressor")
install (TARGETS clickhouse-compressor RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse-compressor) #install (TARGETS util-clickhouse-compressor RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse-compressor)
add_executable (zstd_test zstd_test.cpp) add_executable (zstd_test zstd_test.cpp)
target_link_libraries (zstd_test ${ZSTD_LIBRARY} Threads::Threads) target_link_libraries (zstd_test ${ZSTD_LIBRARY} Threads::Threads)