mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 01:30:51 +00:00
Merge remote-tracking branch 'upstream/master' into system-log-prepare-table-on-each-flush
This commit is contained in:
commit
b236e148c5
@ -49,7 +49,8 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
|
||||
set (Poco_DataODBC_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/poco/Data/ODBC/include/")
|
||||
endif ()
|
||||
|
||||
if (OPENSSL_FOUND)
|
||||
# TODO! fix internal ssl
|
||||
if (OPENSSL_FOUND AND NOT USE_INTERNAL_SSL_LIBRARY)
|
||||
set (Poco_NetSSL_FOUND 1)
|
||||
set (Poco_NetSSL_LIBRARY PocoNetSSL)
|
||||
set (Poco_Crypto_LIBRARY PocoCrypto)
|
||||
|
20
contrib/CMakeLists.txt
vendored
20
contrib/CMakeLists.txt
vendored
@ -100,13 +100,17 @@ if (USE_INTERNAL_RDKAFKA_LIBRARY)
|
||||
mark_as_advanced (ZLIB_INCLUDE_DIR)
|
||||
|
||||
if (USE_INTERNAL_SSL_LIBRARY)
|
||||
add_library(bundled-ssl ALIAS ${OPENSSL_SSL_LIBRARY})
|
||||
set (WITH_BUNDLED_SSL 1)
|
||||
if (MAKE_STATIC_LIBRARIES)
|
||||
add_library(bundled-ssl ALIAS ${OPENSSL_SSL_LIBRARY})
|
||||
set (WITH_BUNDLED_SSL 1 CACHE INTERNAL "")
|
||||
else ()
|
||||
set (WITH_SSL 0 CACHE INTERNAL "")
|
||||
endif ()
|
||||
endif ()
|
||||
|
||||
add_subdirectory (librdkafka)
|
||||
|
||||
if (USE_INTERNAL_SSL_LIBRARY)
|
||||
if (USE_INTERNAL_SSL_LIBRARY AND MAKE_STATIC_LIBRARIES)
|
||||
target_include_directories(rdkafka PRIVATE BEFORE ${OPENSSL_INCLUDE_DIR})
|
||||
endif ()
|
||||
target_include_directories(rdkafka PRIVATE BEFORE ${ZLIB_INCLUDE_DIR})
|
||||
@ -127,16 +131,18 @@ endif ()
|
||||
|
||||
|
||||
if (USE_INTERNAL_POCO_LIBRARY)
|
||||
set (ALLOW_DUPLICATE_CUSTOM_TARGETS 1)
|
||||
set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
|
||||
set (save_CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
|
||||
set (_save ${ENABLE_TESTS})
|
||||
set (ENABLE_TESTS 0)
|
||||
set (CMAKE_DISABLE_FIND_PACKAGE_ZLIB 1)
|
||||
if (USE_INTERNAL_SSL_LIBRARY)
|
||||
set (DISABLE_INTERNAL_OPENSSL 1)
|
||||
set (ENABLE_NETSSL 0) # TODO!
|
||||
set (ENABLE_CRYPTO 0) # TODO!
|
||||
set (DISABLE_INTERNAL_OPENSSL 1 CACHE INTERNAL "")
|
||||
set (ENABLE_NETSSL 0 CACHE INTERNAL "") # TODO!
|
||||
set (ENABLE_CRYPTO 0 CACHE INTERNAL "") # TODO!
|
||||
endif ()
|
||||
if (MSVC)
|
||||
set (ENABLE_DATA_ODBC 0 CACHE INTERNAL "") # TODO (build fail)
|
||||
endif ()
|
||||
add_subdirectory (poco)
|
||||
unset (CMAKE_DISABLE_FIND_PACKAGE_ZLIB)
|
||||
|
2
contrib/boost
vendored
2
contrib/boost
vendored
@ -1 +1 @@
|
||||
Subproject commit eb5943711e88d1008583e6ae3720a5489313d02e
|
||||
Subproject commit 5121cc9d0375c7b81b24b6087a51684e6cd62ded
|
@ -5,7 +5,11 @@
|
||||
/* #undef AC_APPLE_UNIVERSAL_BUILD */
|
||||
|
||||
/* Define to 1 if the compiler supports __builtin_expect. */
|
||||
#if _MSC_VER
|
||||
#define HAVE_BUILTIN_EXPECT 0
|
||||
#else
|
||||
#define HAVE_BUILTIN_EXPECT 1
|
||||
#endif
|
||||
|
||||
/* Define to 1 if you have the <dlfcn.h> header file. */
|
||||
#define HAVE_DLFCN_H 1
|
||||
|
@ -2,4 +2,8 @@ add_library(farmhash
|
||||
farmhash.cc
|
||||
farmhash.h)
|
||||
|
||||
if (MSVC)
|
||||
target_compile_definitions (farmhash PRIVATE FARMHASH_NO_BUILTIN_EXPECT=1)
|
||||
endif ()
|
||||
|
||||
target_include_directories (farmhash PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
|
||||
|
@ -13,7 +13,11 @@
|
||||
/* #undef ENABLE_SIZED_DELETE */
|
||||
|
||||
/* Define to 1 if compiler supports __builtin_expect */
|
||||
#if _MSC_VER
|
||||
#define HAVE_BUILTIN_EXPECT 0
|
||||
#else
|
||||
#define HAVE_BUILTIN_EXPECT 1
|
||||
#endif
|
||||
|
||||
/* Define to 1 if compiler supports __builtin_stack_pointer */
|
||||
/* #undef HAVE_BUILTIN_STACK_POINTER */
|
||||
|
@ -131,7 +131,6 @@ target_link_libraries (clickhouse_common_io
|
||||
${LINK_LIBRARIES_ONLY_ON_X86_64}
|
||||
${LZ4_LIBRARY}
|
||||
${ZSTD_LIBRARY}
|
||||
${ZOOKEEPER_LIBRARY}
|
||||
${DOUBLE_CONVERSION_LIBRARIES}
|
||||
${Poco_Net_LIBRARY}
|
||||
${Poco_Data_LIBRARY}
|
||||
@ -158,10 +157,6 @@ if (NOT USE_INTERNAL_RE2_LIBRARY)
|
||||
target_include_directories (dbms BEFORE PRIVATE ${RE2_INCLUDE_DIR})
|
||||
endif ()
|
||||
|
||||
if (NOT USE_INTERNAL_ZOOKEEPER_LIBRARY)
|
||||
target_include_directories (clickhouse_common_io BEFORE PUBLIC ${ZOOKEEPER_INCLUDE_DIR})
|
||||
endif ()
|
||||
|
||||
if (NOT USE_INTERNAL_BOOST_LIBRARY)
|
||||
target_include_directories (clickhouse_common_io BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
|
||||
endif ()
|
||||
|
@ -1,6 +1,6 @@
|
||||
# This strings autochanged from release_lib.sh:
|
||||
set(VERSION_DESCRIBE v1.1.54370-testing)
|
||||
set(VERSION_REVISION 54370)
|
||||
set(VERSION_DESCRIBE v1.1.54371-testing)
|
||||
set(VERSION_REVISION 54371)
|
||||
# end of autochange
|
||||
|
||||
set (VERSION_MAJOR 1)
|
||||
|
@ -70,7 +70,10 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
|
||||
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
|
||||
}
|
||||
|
||||
return getImpl(name, argument_types, parameters, recursion_level);
|
||||
auto res = getImpl(name, argument_types, parameters, recursion_level);
|
||||
if (!res)
|
||||
throw Exception("Logical error: AggregateFunctionFactory returned nullptr", ErrorCodes::LOGICAL_ERROR);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
|
@ -16,7 +16,12 @@ namespace
|
||||
assertBinary(name, argument_types);
|
||||
assertNoParameters(name, parameters);
|
||||
|
||||
return AggregateFunctionPtr{createWithNumericType<AggregateFunctionIntersectionsMax>(*argument_types[0], kind, argument_types)};
|
||||
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionIntersectionsMax>(*argument_types[0], kind, argument_types));
|
||||
if (!res)
|
||||
throw Exception("Illegal types " + argument_types[0]->getName() + " and " + argument_types[1]->getName()
|
||||
+ " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,11 @@ AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, con
|
||||
values_types.push_back(array_type->getNestedType());
|
||||
}
|
||||
|
||||
return AggregateFunctionPtr(createWithNumericType<AggregateFunctionSumMap>(*keys_type, keys_type, std::move(values_types)));
|
||||
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionSumMap>(*keys_type, keys_type, std::move(values_types)));
|
||||
if (!res)
|
||||
throw Exception("Illegal type of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -148,6 +148,10 @@ void Connection::receiveHello()
|
||||
{
|
||||
readStringBinary(server_timezone, *in);
|
||||
}
|
||||
if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
|
||||
{
|
||||
readStringBinary(server_display_name, *in);
|
||||
}
|
||||
}
|
||||
else if (packet_type == Protocol::Server::Exception)
|
||||
receiveException()->rethrow();
|
||||
@ -203,6 +207,14 @@ const String & Connection::getServerTimezone()
|
||||
return server_timezone;
|
||||
}
|
||||
|
||||
const String & Connection::getServerDisplayName()
|
||||
{
|
||||
if (!connected)
|
||||
connect();
|
||||
|
||||
return server_display_name;
|
||||
}
|
||||
|
||||
void Connection::forceConnected()
|
||||
{
|
||||
if (!connected)
|
||||
|
@ -134,6 +134,7 @@ public:
|
||||
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision);
|
||||
|
||||
const String & getServerTimezone();
|
||||
const String & getServerDisplayName();
|
||||
|
||||
/// For log and exception messages.
|
||||
const String & getDescription() const;
|
||||
@ -213,6 +214,7 @@ private:
|
||||
UInt64 server_version_minor = 0;
|
||||
UInt64 server_revision = 0;
|
||||
String server_timezone;
|
||||
String server_display_name;
|
||||
|
||||
std::unique_ptr<Poco::Net::StreamSocket> socket;
|
||||
std::shared_ptr<ReadBuffer> in;
|
||||
|
@ -113,7 +113,7 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t result_size_hint) const
|
||||
{
|
||||
size_t size = getData().size();
|
||||
if (size != filter.size())
|
||||
@ -140,7 +140,7 @@ MutableColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limit) const
|
||||
{
|
||||
size_t size = getData().size();
|
||||
|
||||
@ -325,7 +325,7 @@ void ColumnAggregateFunction::popBack(size_t n)
|
||||
data.resize_assume_reserved(new_size);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnAggregateFunction::replicate(const IColumn::Offsets & offsets) const
|
||||
ColumnPtr ColumnAggregateFunction::replicate(const IColumn::Offsets & offsets) const
|
||||
{
|
||||
size_t size = data.size();
|
||||
if (size != offsets.size())
|
||||
|
@ -152,11 +152,11 @@ public:
|
||||
|
||||
void popBack(size_t n) override;
|
||||
|
||||
MutableColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override;
|
||||
ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override;
|
||||
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
|
||||
MutableColumnPtr replicate(const Offsets & offsets) const override;
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
|
||||
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
|
||||
|
||||
|
@ -31,10 +31,10 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
ColumnArray::ColumnArray(const ColumnPtr & nested_column, const ColumnPtr & offsets_column)
|
||||
: data(nested_column), offsets(offsets_column)
|
||||
ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column)
|
||||
: data(std::move(nested_column)), offsets(std::move(offsets_column))
|
||||
{
|
||||
if (!typeid_cast<const ColumnOffsets *>(offsets_column.get()))
|
||||
if (!typeid_cast<const ColumnOffsets *>(offsets.get()))
|
||||
throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
/** NOTE
|
||||
@ -43,8 +43,8 @@ ColumnArray::ColumnArray(const ColumnPtr & nested_column, const ColumnPtr & offs
|
||||
*/
|
||||
}
|
||||
|
||||
ColumnArray::ColumnArray(const ColumnPtr & nested_column)
|
||||
: data(nested_column)
|
||||
ColumnArray::ColumnArray(MutableColumnPtr && nested_column)
|
||||
: data(std::move(nested_column))
|
||||
{
|
||||
if (!data->empty())
|
||||
throw Exception("Not empty data passed to ColumnArray, but no offsets passed", ErrorCodes::ILLEGAL_COLUMN);
|
||||
@ -317,7 +317,7 @@ bool ColumnArray::hasEqualOffsets(const ColumnArray & other) const
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::convertToFullColumnIfConst() const
|
||||
ColumnPtr ColumnArray::convertToFullColumnIfConst() const
|
||||
{
|
||||
ColumnPtr new_data;
|
||||
|
||||
@ -391,7 +391,7 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnArray::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
if (typeid_cast<const ColumnUInt8 *>(data.get())) return filterNumber<UInt8>(filt, result_size_hint);
|
||||
if (typeid_cast<const ColumnUInt16 *>(data.get())) return filterNumber<UInt16>(filt, result_size_hint);
|
||||
@ -410,7 +410,7 @@ MutableColumnPtr ColumnArray::filter(const Filter & filt, ssize_t result_size_hi
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
MutableColumnPtr ColumnArray::filterNumber(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnArray::filterNumber(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
if (getOffsets().size() == 0)
|
||||
return ColumnArray::create(data);
|
||||
@ -424,7 +424,7 @@ MutableColumnPtr ColumnArray::filterNumber(const Filter & filt, ssize_t result_s
|
||||
return std::move(res);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnArray::filterString(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnArray::filterString(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
size_t col_size = getOffsets().size();
|
||||
if (col_size != filt.size())
|
||||
@ -492,7 +492,7 @@ MutableColumnPtr ColumnArray::filterString(const Filter & filt, ssize_t result_s
|
||||
return std::move(res);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnArray::filterGeneric(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnArray::filterGeneric(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
size_t size = getOffsets().size();
|
||||
if (size != filt.size())
|
||||
@ -537,7 +537,7 @@ MutableColumnPtr ColumnArray::filterGeneric(const Filter & filt, ssize_t result_
|
||||
return std::move(res);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnArray::filterNullable(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnArray::filterNullable(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
if (getOffsets().size() == 0)
|
||||
return ColumnArray::create(data);
|
||||
@ -560,7 +560,7 @@ MutableColumnPtr ColumnArray::filterNullable(const Filter & filt, ssize_t result
|
||||
filtered_offsets);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
if (getOffsets().size() == 0)
|
||||
return ColumnArray::create(data);
|
||||
@ -576,7 +576,8 @@ MutableColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_si
|
||||
|
||||
Columns temporary_arrays(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
temporary_arrays[i] = ColumnArray(tuple.getColumns()[i], getOffsetsPtr()).filter(filt, result_size_hint);
|
||||
temporary_arrays[i] = ColumnArray(tuple.getColumns()[i]->assumeMutable(), getOffsetsPtr()->assumeMutable())
|
||||
.filter(filt, result_size_hint);
|
||||
|
||||
Columns tuple_columns(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
@ -588,7 +589,7 @@ MutableColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_si
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const
|
||||
{
|
||||
size_t size = getOffsets().size();
|
||||
|
||||
@ -652,7 +653,7 @@ void ColumnArray::getPermutation(bool reverse, size_t limit, int nan_direction_h
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::replicate(const Offsets & replicate_offsets) const
|
||||
ColumnPtr ColumnArray::replicate(const Offsets & replicate_offsets) const
|
||||
{
|
||||
if (typeid_cast<const ColumnUInt8 *>(data.get())) return replicateNumber<UInt8>(replicate_offsets);
|
||||
if (typeid_cast<const ColumnUInt16 *>(data.get())) return replicateNumber<UInt16>(replicate_offsets);
|
||||
@ -673,7 +674,7 @@ MutableColumnPtr ColumnArray::replicate(const Offsets & replicate_offsets) const
|
||||
|
||||
|
||||
template <typename T>
|
||||
MutableColumnPtr ColumnArray::replicateNumber(const Offsets & replicate_offsets) const
|
||||
ColumnPtr ColumnArray::replicateNumber(const Offsets & replicate_offsets) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
if (col_size != replicate_offsets.size())
|
||||
@ -721,7 +722,7 @@ MutableColumnPtr ColumnArray::replicateNumber(const Offsets & replicate_offsets)
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::replicateString(const Offsets & replicate_offsets) const
|
||||
ColumnPtr ColumnArray::replicateString(const Offsets & replicate_offsets) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
if (col_size != replicate_offsets.size())
|
||||
@ -796,7 +797,7 @@ MutableColumnPtr ColumnArray::replicateString(const Offsets & replicate_offsets)
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::replicateConst(const Offsets & replicate_offsets) const
|
||||
ColumnPtr ColumnArray::replicateConst(const Offsets & replicate_offsets) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
if (col_size != replicate_offsets.size())
|
||||
@ -834,7 +835,7 @@ MutableColumnPtr ColumnArray::replicateConst(const Offsets & replicate_offsets)
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::replicateGeneric(const Offsets & replicate_offsets) const
|
||||
ColumnPtr ColumnArray::replicateGeneric(const Offsets & replicate_offsets) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
if (col_size != replicate_offsets.size())
|
||||
@ -860,25 +861,27 @@ MutableColumnPtr ColumnArray::replicateGeneric(const Offsets & replicate_offsets
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::replicateNullable(const Offsets & replicate_offsets) const
|
||||
ColumnPtr ColumnArray::replicateNullable(const Offsets & replicate_offsets) const
|
||||
{
|
||||
const ColumnNullable & nullable = static_cast<const ColumnNullable &>(*data);
|
||||
|
||||
/// Make temporary arrays for each components of Nullable. Then replicate them independently and collect back to result.
|
||||
/// NOTE Offsets are calculated twice and it is redundant.
|
||||
|
||||
auto array_of_nested = ColumnArray(nullable.getNestedColumnPtr(), getOffsetsPtr()).replicate(replicate_offsets);
|
||||
auto array_of_null_map = ColumnArray(nullable.getNullMapColumnPtr(), getOffsetsPtr()).replicate(replicate_offsets);
|
||||
auto array_of_nested = ColumnArray(nullable.getNestedColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable())
|
||||
.replicate(replicate_offsets);
|
||||
auto array_of_null_map = ColumnArray(nullable.getNullMapColumnPtr()->assumeMutable(), getOffsetsPtr()->assumeMutable())
|
||||
.replicate(replicate_offsets);
|
||||
|
||||
return ColumnArray::create(
|
||||
ColumnNullable::create(
|
||||
static_cast<ColumnArray &>(*array_of_nested).getDataPtr(),
|
||||
static_cast<ColumnArray &>(*array_of_null_map).getDataPtr()),
|
||||
static_cast<ColumnArray &>(*array_of_nested).getOffsetsPtr());
|
||||
static_cast<const ColumnArray &>(*array_of_nested).getDataPtr(),
|
||||
static_cast<const ColumnArray &>(*array_of_null_map).getDataPtr()),
|
||||
static_cast<const ColumnArray &>(*array_of_nested).getOffsetsPtr());
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnArray::replicateTuple(const Offsets & replicate_offsets) const
|
||||
ColumnPtr ColumnArray::replicateTuple(const Offsets & replicate_offsets) const
|
||||
{
|
||||
const ColumnTuple & tuple = static_cast<const ColumnTuple &>(*data);
|
||||
|
||||
@ -891,7 +894,8 @@ MutableColumnPtr ColumnArray::replicateTuple(const Offsets & replicate_offsets)
|
||||
|
||||
Columns temporary_arrays(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
temporary_arrays[i] = ColumnArray(tuple.getColumns()[i], getOffsetsPtr()).replicate(replicate_offsets);
|
||||
temporary_arrays[i] = ColumnArray(tuple.getColumns()[i]->assumeMutable(), getOffsetsPtr()->assumeMutable())
|
||||
.replicate(replicate_offsets);
|
||||
|
||||
Columns tuple_columns(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
|
@ -24,14 +24,32 @@ private:
|
||||
friend class COWPtrHelper<IColumn, ColumnArray>;
|
||||
|
||||
/** Create an array column with specified values and offsets. */
|
||||
ColumnArray(const ColumnPtr & nested_column, const ColumnPtr & offsets_column);
|
||||
ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column);
|
||||
|
||||
/** Create an empty column of arrays with the type of values as in the column `nested_column` */
|
||||
ColumnArray(const ColumnPtr & nested_column);
|
||||
explicit ColumnArray(MutableColumnPtr && nested_column);
|
||||
|
||||
ColumnArray(const ColumnArray &) = default;
|
||||
|
||||
public:
|
||||
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
|
||||
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
|
||||
*/
|
||||
using Base = COWPtrHelper<IColumn, ColumnArray>;
|
||||
|
||||
static Ptr create(const ColumnPtr & nested_column, const ColumnPtr & offsets_column)
|
||||
{
|
||||
return ColumnArray::create(nested_column->assumeMutable(), offsets_column->assumeMutable());
|
||||
}
|
||||
|
||||
static Ptr create(const ColumnPtr & nested_column)
|
||||
{
|
||||
return ColumnArray::create(nested_column->assumeMutable());
|
||||
}
|
||||
|
||||
template <typename ... Args, typename = typename std::enable_if<IsMutableColumns<Args ...>::value>::type>
|
||||
static MutablePtr create(Args &&... args) { return Base::create(std::forward<Args>(args)...); }
|
||||
|
||||
/** On the index i there is an offset to the beginning of the i + 1 -th element. */
|
||||
using ColumnOffsets = ColumnVector<Offset>;
|
||||
|
||||
@ -51,15 +69,15 @@ public:
|
||||
void insertFrom(const IColumn & src_, size_t n) override;
|
||||
void insertDefault() override;
|
||||
void popBack(size_t n) override;
|
||||
MutableColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void reserve(size_t n) override;
|
||||
size_t byteSize() const override;
|
||||
size_t allocatedBytes() const override;
|
||||
MutableColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||
MutableColumnPtr convertToFullColumnIfConst() const override;
|
||||
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||
ColumnPtr convertToFullColumnIfConst() const override;
|
||||
void getExtremes(Field & min, Field & max) const override;
|
||||
|
||||
bool hasEqualOffsets(const ColumnArray & other) const;
|
||||
@ -110,33 +128,33 @@ private:
|
||||
|
||||
/// Multiply values if the nested column is ColumnVector<T>.
|
||||
template <typename T>
|
||||
MutableColumnPtr replicateNumber(const Offsets & replicate_offsets) const;
|
||||
ColumnPtr replicateNumber(const Offsets & replicate_offsets) const;
|
||||
|
||||
/// Multiply the values if the nested column is ColumnString. The code is too complicated.
|
||||
MutableColumnPtr replicateString(const Offsets & replicate_offsets) const;
|
||||
ColumnPtr replicateString(const Offsets & replicate_offsets) const;
|
||||
|
||||
/** Non-constant arrays of constant values are quite rare.
|
||||
* Most functions can not work with them, and does not create such columns as a result.
|
||||
* An exception is the function `replicate`(see FunctionsMiscellaneous.h), which has service meaning for the implementation of lambda functions.
|
||||
* Only for its sake is the implementation of the `replicate` method for ColumnArray(ColumnConst).
|
||||
*/
|
||||
MutableColumnPtr replicateConst(const Offsets & replicate_offsets) const;
|
||||
ColumnPtr replicateConst(const Offsets & replicate_offsets) const;
|
||||
|
||||
/** The following is done by simply replicating of nested columns.
|
||||
*/
|
||||
MutableColumnPtr replicateTuple(const Offsets & replicate_offsets) const;
|
||||
MutableColumnPtr replicateNullable(const Offsets & replicate_offsets) const;
|
||||
MutableColumnPtr replicateGeneric(const Offsets & replicate_offsets) const;
|
||||
ColumnPtr replicateTuple(const Offsets & replicate_offsets) const;
|
||||
ColumnPtr replicateNullable(const Offsets & replicate_offsets) const;
|
||||
ColumnPtr replicateGeneric(const Offsets & replicate_offsets) const;
|
||||
|
||||
|
||||
/// Specializations for the filter function.
|
||||
template <typename T>
|
||||
MutableColumnPtr filterNumber(const Filter & filt, ssize_t result_size_hint) const;
|
||||
ColumnPtr filterNumber(const Filter & filt, ssize_t result_size_hint) const;
|
||||
|
||||
MutableColumnPtr filterString(const Filter & filt, ssize_t result_size_hint) const;
|
||||
MutableColumnPtr filterTuple(const Filter & filt, ssize_t result_size_hint) const;
|
||||
MutableColumnPtr filterNullable(const Filter & filt, ssize_t result_size_hint) const;
|
||||
MutableColumnPtr filterGeneric(const Filter & filt, ssize_t result_size_hint) const;
|
||||
ColumnPtr filterString(const Filter & filt, ssize_t result_size_hint) const;
|
||||
ColumnPtr filterTuple(const Filter & filt, ssize_t result_size_hint) const;
|
||||
ColumnPtr filterNullable(const Filter & filt, ssize_t result_size_hint) const;
|
||||
ColumnPtr filterGeneric(const Filter & filt, ssize_t result_size_hint) const;
|
||||
};
|
||||
|
||||
|
||||
|
@ -25,12 +25,12 @@ ColumnConst::ColumnConst(const ColumnPtr & data_, size_t s)
|
||||
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnConst::convertToFullColumn() const
|
||||
ColumnPtr ColumnConst::convertToFullColumn() const
|
||||
{
|
||||
return data->replicate(Offsets(1, s));
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnConst::filter(const Filter & filt, ssize_t /*result_size_hint*/) const
|
||||
ColumnPtr ColumnConst::filter(const Filter & filt, ssize_t /*result_size_hint*/) const
|
||||
{
|
||||
if (s != filt.size())
|
||||
throw Exception("Size of filter (" + toString(filt.size()) + ") doesn't match size of column (" + toString(s) + ")",
|
||||
@ -39,7 +39,7 @@ MutableColumnPtr ColumnConst::filter(const Filter & filt, ssize_t /*result_size_
|
||||
return ColumnConst::create(data, countBytesInFilter(filt));
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnConst::replicate(const Offsets & offsets) const
|
||||
ColumnPtr ColumnConst::replicate(const Offsets & offsets) const
|
||||
{
|
||||
if (s != offsets.size())
|
||||
throw Exception("Size of offsets (" + toString(offsets.size()) + ") doesn't match size of column (" + toString(s) + ")",
|
||||
@ -49,7 +49,7 @@ MutableColumnPtr ColumnConst::replicate(const Offsets & offsets) const
|
||||
return ColumnConst::create(data, replicated_size);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnConst::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnConst::permute(const Permutation & perm, size_t limit) const
|
||||
{
|
||||
if (limit == 0)
|
||||
limit = s;
|
||||
|
@ -29,9 +29,9 @@ private:
|
||||
ColumnConst(const ColumnConst & src) = default;
|
||||
|
||||
public:
|
||||
MutableColumnPtr convertToFullColumn() const;
|
||||
ColumnPtr convertToFullColumn() const;
|
||||
|
||||
MutableColumnPtr convertToFullColumnIfConst() const override
|
||||
ColumnPtr convertToFullColumnIfConst() const override
|
||||
{
|
||||
return convertToFullColumn();
|
||||
}
|
||||
@ -145,9 +145,9 @@ public:
|
||||
data->updateHashWithValue(0, hash);
|
||||
}
|
||||
|
||||
MutableColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
MutableColumnPtr replicate(const Offsets & offsets) const override;
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
|
||||
size_t byteSize() const override
|
||||
|
@ -153,7 +153,7 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_
|
||||
memcpy(&chars[old_size], &src_concrete.chars[start * n], length * n);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
if (col_size != filt.size())
|
||||
@ -230,7 +230,7 @@ MutableColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t
|
||||
return std::move(res);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnFixedString::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnFixedString::permute(const Permutation & perm, size_t limit) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
|
||||
@ -258,7 +258,7 @@ MutableColumnPtr ColumnFixedString::permute(const Permutation & perm, size_t lim
|
||||
return std::move(res);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnFixedString::replicate(const Offsets & offsets) const
|
||||
ColumnPtr ColumnFixedString::replicate(const Offsets & offsets) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
if (col_size != offsets.size())
|
||||
|
@ -104,11 +104,11 @@ public:
|
||||
|
||||
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
|
||||
|
||||
MutableColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
|
||||
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
|
||||
MutableColumnPtr replicate(const Offsets & offsets) const override;
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
|
||||
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
|
||||
{
|
||||
|
@ -26,7 +26,7 @@ MutableColumnPtr ColumnFunction::cloneResized(size_t size) const
|
||||
return ColumnFunction::create(size, function, capture);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnFunction::replicate(const Offsets & offsets) const
|
||||
ColumnPtr ColumnFunction::replicate(const Offsets & offsets) const
|
||||
{
|
||||
if (size_ != offsets.size())
|
||||
throw Exception("Size of offsets (" + toString(offsets.size()) + ") doesn't match size of column ("
|
||||
@ -40,7 +40,7 @@ MutableColumnPtr ColumnFunction::replicate(const Offsets & offsets) const
|
||||
return ColumnFunction::create(replicated_size, function, capture);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnFunction::cut(size_t start, size_t length) const
|
||||
ColumnPtr ColumnFunction::cut(size_t start, size_t length) const
|
||||
{
|
||||
ColumnsWithTypeAndName capture = captured_columns;
|
||||
for (auto & column : capture)
|
||||
@ -49,7 +49,7 @@ MutableColumnPtr ColumnFunction::cut(size_t start, size_t length) const
|
||||
return ColumnFunction::create(length, function, capture);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnFunction::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnFunction::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
if (size_ != filt.size())
|
||||
throw Exception("Size of filter (" + toString(filt.size()) + ") doesn't match size of column ("
|
||||
@ -68,7 +68,7 @@ MutableColumnPtr ColumnFunction::filter(const Filter & filt, ssize_t result_size
|
||||
return ColumnFunction::create(filtered_size, function, capture);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnFunction::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnFunction::permute(const Permutation & perm, size_t limit) const
|
||||
{
|
||||
if (limit == 0)
|
||||
limit = size_;
|
||||
|
@ -29,10 +29,10 @@ public:
|
||||
|
||||
size_t size() const override { return size_; }
|
||||
|
||||
MutableColumnPtr cut(size_t start, size_t length) const override;
|
||||
MutableColumnPtr replicate(const Offsets & offsets) const override;
|
||||
MutableColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr cut(size_t start, size_t length) const override;
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
void insertDefault() override;
|
||||
void popBack(size_t n) override;
|
||||
std::vector<MutableColumnPtr> scatter(IColumn::ColumnIndex num_columns,
|
||||
|
@ -18,8 +18,8 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
ColumnNullable::ColumnNullable(const ColumnPtr & nested_column_, const ColumnPtr & null_map_)
|
||||
: nested_column{nested_column_}, null_map{null_map_}
|
||||
ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumnPtr && null_map_)
|
||||
: nested_column(std::move(nested_column_)), null_map(std::move(null_map_))
|
||||
{
|
||||
/// ColumnNullable cannot have constant nested column. But constant argument could be passed. Materialize it.
|
||||
if (ColumnPtr nested_column_materialized = getNestedColumn().convertToFullColumnIfConst())
|
||||
@ -44,7 +44,7 @@ void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash) const
|
||||
|
||||
MutableColumnPtr ColumnNullable::cloneResized(size_t new_size) const
|
||||
{
|
||||
ColumnPtr new_nested_col = getNestedColumn().cloneResized(new_size);
|
||||
MutableColumnPtr new_nested_col = getNestedColumn().cloneResized(new_size);
|
||||
auto new_null_map = ColumnUInt8::create();
|
||||
|
||||
if (new_size > 0)
|
||||
@ -59,7 +59,7 @@ MutableColumnPtr ColumnNullable::cloneResized(size_t new_size) const
|
||||
memset(&new_null_map->getData()[count], 1, new_size - count);
|
||||
}
|
||||
|
||||
return ColumnNullable::create(new_nested_col, std::move(new_null_map));
|
||||
return ColumnNullable::create(std::move(new_nested_col), std::move(new_null_map));
|
||||
}
|
||||
|
||||
|
||||
@ -152,14 +152,14 @@ void ColumnNullable::popBack(size_t n)
|
||||
getNullMapColumn().popBack(n);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint);
|
||||
ColumnPtr filtered_null_map = getNullMapColumn().filter(filt, result_size_hint);
|
||||
return ColumnNullable::create(filtered_data, filtered_null_map);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnNullable::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnNullable::permute(const Permutation & perm, size_t limit) const
|
||||
{
|
||||
ColumnPtr permuted_data = getNestedColumn().permute(perm, limit);
|
||||
ColumnPtr permuted_null_map = getNullMapColumn().permute(perm, limit);
|
||||
@ -384,7 +384,7 @@ void ColumnNullable::getExtremes(Field & min, Field & max) const
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnNullable::replicate(const Offsets & offsets) const
|
||||
ColumnPtr ColumnNullable::replicate(const Offsets & offsets) const
|
||||
{
|
||||
ColumnPtr replicated_data = getNestedColumn().replicate(offsets);
|
||||
ColumnPtr replicated_null_map = getNullMapColumn().replicate(offsets);
|
||||
|
@ -23,10 +23,22 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
|
||||
private:
|
||||
friend class COWPtrHelper<IColumn, ColumnNullable>;
|
||||
|
||||
ColumnNullable(const ColumnPtr & nested_column_, const ColumnPtr & null_map_);
|
||||
ColumnNullable(MutableColumnPtr && nested_column_, MutableColumnPtr && null_map_);
|
||||
ColumnNullable(const ColumnNullable &) = default;
|
||||
|
||||
public:
|
||||
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
|
||||
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
|
||||
*/
|
||||
using Base = COWPtrHelper<IColumn, ColumnNullable>;
|
||||
static Ptr create(const ColumnPtr & nested_column_, const ColumnPtr & null_map_)
|
||||
{
|
||||
return ColumnNullable::create(nested_column_->assumeMutable(), null_map_->assumeMutable());
|
||||
}
|
||||
|
||||
template <typename ... Args, typename = typename std::enable_if<IsMutableColumns<Args ...>::value>::type>
|
||||
static MutablePtr create(Args &&... args) { return Base::create(std::forward<Args>(args)...); }
|
||||
|
||||
const char * getFamilyName() const override { return "Nullable"; }
|
||||
std::string getName() const override { return "Nullable(" + nested_column->getName() + ")"; }
|
||||
MutableColumnPtr cloneResized(size_t size) const override;
|
||||
@ -50,14 +62,14 @@ public:
|
||||
}
|
||||
|
||||
void popBack(size_t n) override;
|
||||
MutableColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const override;
|
||||
void getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
|
||||
void reserve(size_t n) override;
|
||||
size_t byteSize() const override;
|
||||
size_t allocatedBytes() const override;
|
||||
MutableColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||
void updateHashWithValue(size_t n, SipHash & hash) const override;
|
||||
void getExtremes(Field & min, Field & max) const override;
|
||||
|
||||
|
@ -97,7 +97,7 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnString::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnString::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
if (offsets.size() == 0)
|
||||
return ColumnString::create();
|
||||
@ -112,7 +112,7 @@ MutableColumnPtr ColumnString::filter(const Filter & filt, ssize_t result_size_h
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
|
||||
{
|
||||
size_t size = offsets.size();
|
||||
|
||||
@ -208,7 +208,7 @@ void ColumnString::getPermutation(bool reverse, size_t limit, int /*nan_directio
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr ColumnString::replicate(const Offsets & replicate_offsets) const
|
||||
ColumnPtr ColumnString::replicate(const Offsets & replicate_offsets) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
if (col_size != replicate_offsets.size())
|
||||
|
@ -206,9 +206,9 @@ public:
|
||||
|
||||
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
|
||||
|
||||
MutableColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
|
||||
void insertDefault() override
|
||||
{
|
||||
@ -239,7 +239,7 @@ public:
|
||||
/// Sorting with respect of collation.
|
||||
void getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, Permutation & res) const;
|
||||
|
||||
MutableColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||
|
||||
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
|
||||
{
|
||||
|
@ -31,21 +31,38 @@ std::string ColumnTuple::getName() const
|
||||
return res.str();
|
||||
}
|
||||
|
||||
ColumnTuple::ColumnTuple(const Columns & columns) : columns(columns)
|
||||
ColumnTuple::ColumnTuple(MutableColumns && mutable_columns)
|
||||
{
|
||||
columns.reserve(mutable_columns.size());
|
||||
for (auto & column : mutable_columns)
|
||||
{
|
||||
if (column->isColumnConst())
|
||||
throw Exception{"ColumnTuple cannot have ColumnConst as its element", ErrorCodes::ILLEGAL_COLUMN};
|
||||
|
||||
columns.push_back(std::move(column));
|
||||
}
|
||||
}
|
||||
|
||||
ColumnTuple::Ptr ColumnTuple::create(const Columns & columns)
|
||||
{
|
||||
for (const auto & column : columns)
|
||||
if (column->isColumnConst())
|
||||
throw Exception{"ColumnTuple cannot have ColumnConst as its element", ErrorCodes::ILLEGAL_COLUMN};
|
||||
|
||||
auto column_tuple = ColumnTuple::create(MutableColumns());
|
||||
column_tuple->columns = columns;
|
||||
|
||||
return std::move(column_tuple);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnTuple::cloneEmpty() const
|
||||
{
|
||||
const size_t tuple_size = columns.size();
|
||||
Columns new_columns(tuple_size);
|
||||
MutableColumns new_columns(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
new_columns[i] = columns[i]->cloneEmpty();
|
||||
|
||||
return ColumnTuple::create(new_columns);
|
||||
return ColumnTuple::create(std::move(new_columns));
|
||||
}
|
||||
|
||||
Field ColumnTuple::operator[](size_t n) const
|
||||
@ -140,7 +157,7 @@ void ColumnTuple::insertRangeFrom(const IColumn & src, size_t start, size_t leng
|
||||
start, length);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnTuple::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnTuple::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
const size_t tuple_size = columns.size();
|
||||
Columns new_columns(tuple_size);
|
||||
@ -151,7 +168,7 @@ MutableColumnPtr ColumnTuple::filter(const Filter & filt, ssize_t result_size_hi
|
||||
return ColumnTuple::create(new_columns);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnTuple::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnTuple::permute(const Permutation & perm, size_t limit) const
|
||||
{
|
||||
const size_t tuple_size = columns.size();
|
||||
Columns new_columns(tuple_size);
|
||||
@ -162,7 +179,7 @@ MutableColumnPtr ColumnTuple::permute(const Permutation & perm, size_t limit) co
|
||||
return ColumnTuple::create(new_columns);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnTuple::replicate(const Offsets & offsets) const
|
||||
ColumnPtr ColumnTuple::replicate(const Offsets & offsets) const
|
||||
{
|
||||
const size_t tuple_size = columns.size();
|
||||
Columns new_columns(tuple_size);
|
||||
@ -185,10 +202,10 @@ MutableColumns ColumnTuple::scatter(ColumnIndex num_columns, const Selector & se
|
||||
|
||||
for (size_t scattered_idx = 0; scattered_idx < num_columns; ++scattered_idx)
|
||||
{
|
||||
Columns new_columns(tuple_size);
|
||||
MutableColumns new_columns(tuple_size);
|
||||
for (size_t tuple_element_idx = 0; tuple_element_idx < tuple_size; ++tuple_element_idx)
|
||||
new_columns[tuple_element_idx] = std::move(scattered_tuple_elements[tuple_element_idx][scattered_idx]);
|
||||
res[scattered_idx] = ColumnTuple::create(new_columns);
|
||||
res[scattered_idx] = ColumnTuple::create(std::move(new_columns));
|
||||
}
|
||||
|
||||
return res;
|
||||
|
@ -22,10 +22,19 @@ private:
|
||||
template <bool positive>
|
||||
struct Less;
|
||||
|
||||
ColumnTuple(const Columns & columns);
|
||||
explicit ColumnTuple(MutableColumns && columns);
|
||||
ColumnTuple(const ColumnTuple &) = default;
|
||||
|
||||
public:
|
||||
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
|
||||
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
|
||||
*/
|
||||
using Base = COWPtrHelper<IColumn, ColumnTuple>;
|
||||
static Ptr create(const Columns & columns);
|
||||
|
||||
template <typename Arg, typename = typename std::enable_if<std::is_rvalue_reference<Arg &&>::value>::type>
|
||||
static MutablePtr create(Arg && arg) { return Base::create(std::forward<Arg>(arg)); }
|
||||
|
||||
std::string getName() const override;
|
||||
const char * getFamilyName() const override { return "Tuple"; }
|
||||
|
||||
@ -49,9 +58,9 @@ public:
|
||||
const char * deserializeAndInsertFromArena(const char * pos) override;
|
||||
void updateHashWithValue(size_t n, SipHash & hash) const override;
|
||||
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
|
||||
MutableColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
MutableColumnPtr replicate(const Offsets & offsets) const override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
|
||||
void gather(ColumnGathererStream & gatherer_stream) override;
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
|
||||
|
@ -146,7 +146,7 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
MutableColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
|
||||
ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
size_t size = data.size();
|
||||
if (size != filt.size())
|
||||
@ -210,7 +210,7 @@ MutableColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t r
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
MutableColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t limit) const
|
||||
{
|
||||
size_t size = data.size();
|
||||
|
||||
@ -231,7 +231,7 @@ MutableColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, siz
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
MutableColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets & offsets) const
|
||||
ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets & offsets) const
|
||||
{
|
||||
size_t size = data.size();
|
||||
if (size != offsets.size())
|
||||
|
@ -243,11 +243,11 @@ public:
|
||||
|
||||
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
|
||||
|
||||
MutableColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
|
||||
|
||||
MutableColumnPtr permute(const IColumn::Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr permute(const IColumn::Permutation & perm, size_t limit) const override;
|
||||
|
||||
MutableColumnPtr replicate(const IColumn::Offsets & offsets) const override;
|
||||
ColumnPtr replicate(const IColumn::Offsets & offsets) const override;
|
||||
|
||||
void getExtremes(Field & min, Field & max) const override;
|
||||
|
||||
|
@ -57,7 +57,8 @@ FilterDescription::FilterDescription(const IColumn & column)
|
||||
|
||||
if (const ColumnNullable * nullable_column = typeid_cast<const ColumnNullable *>(&column))
|
||||
{
|
||||
MutableColumnPtr mutable_holder = nullable_column->getNestedColumn().mutate();
|
||||
ColumnPtr nested_column = nullable_column->getNestedColumnPtr();
|
||||
MutableColumnPtr mutable_holder = (*std::move(nested_column)).mutate();
|
||||
|
||||
ColumnUInt8 * concrete_column = typeid_cast<ColumnUInt8 *>(mutable_holder.get());
|
||||
if (!concrete_column)
|
||||
|
@ -45,7 +45,7 @@ public:
|
||||
/** If column isn't constant, returns nullptr (or itself).
|
||||
* If column is constant, transforms constant to full column (if column type allows such tranform) and return it.
|
||||
*/
|
||||
virtual MutablePtr convertToFullColumnIfConst() const { return {}; }
|
||||
virtual Ptr convertToFullColumnIfConst() const { return {}; }
|
||||
|
||||
/// Creates empty column with the same type.
|
||||
virtual MutablePtr cloneEmpty() const { return cloneResized(0); }
|
||||
@ -104,11 +104,11 @@ public:
|
||||
|
||||
/// Removes all elements outside of specified range.
|
||||
/// Is used in LIMIT operation, for example.
|
||||
virtual MutablePtr cut(size_t start, size_t length) const
|
||||
virtual Ptr cut(size_t start, size_t length) const
|
||||
{
|
||||
MutablePtr res = cloneEmpty();
|
||||
res->insertRangeFrom(*this, start, length);
|
||||
return res;
|
||||
return std::move(res);
|
||||
}
|
||||
|
||||
/// Appends new value at the end of column (column's size is increased by 1).
|
||||
@ -171,12 +171,12 @@ public:
|
||||
* otherwise (i.e. < 0), makes reserve() using size of source column.
|
||||
*/
|
||||
using Filter = PaddedPODArray<UInt8>;
|
||||
virtual MutablePtr filter(const Filter & filt, ssize_t result_size_hint) const = 0;
|
||||
virtual Ptr filter(const Filter & filt, ssize_t result_size_hint) const = 0;
|
||||
|
||||
/// Permutes elements using specified permutation. Is used in sortings.
|
||||
/// limit - if it isn't 0, puts only first limit elements in the result.
|
||||
using Permutation = PaddedPODArray<size_t>;
|
||||
virtual MutablePtr permute(const Permutation & perm, size_t limit) const = 0;
|
||||
virtual Ptr permute(const Permutation & perm, size_t limit) const = 0;
|
||||
|
||||
/** Compares (*this)[n] and rhs[m].
|
||||
* Returns negative number, 0, or positive number (*this)[n] is less, equal, greater than rhs[m] respectively.
|
||||
@ -205,7 +205,7 @@ public:
|
||||
*/
|
||||
using Offset = UInt64;
|
||||
using Offsets = PaddedPODArray<Offset>;
|
||||
virtual MutablePtr replicate(const Offsets & offsets) const = 0;
|
||||
virtual Ptr replicate(const Offsets & offsets) const = 0;
|
||||
|
||||
/** Split column to smaller columns. Each value goes to column index, selected by corresponding element of 'selector'.
|
||||
* Selector must contain values from 0 to num_columns - 1.
|
||||
@ -247,10 +247,10 @@ public:
|
||||
virtual void forEachSubcolumn(ColumnCallback) {}
|
||||
|
||||
|
||||
MutablePtr mutate() const
|
||||
MutablePtr mutate() const &&
|
||||
{
|
||||
MutablePtr res = COWPtr<IColumn>::mutate();
|
||||
res->forEachSubcolumn([](Ptr & subcolumn) { subcolumn = subcolumn->mutate(); });
|
||||
res->forEachSubcolumn([](Ptr & subcolumn) { subcolumn = (*std::move(subcolumn)).mutate(); });
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -360,4 +360,16 @@ using MutableColumns = std::vector<MutableColumnPtr>;
|
||||
using ColumnRawPtrs = std::vector<const IColumn *>;
|
||||
//using MutableColumnRawPtrs = std::vector<IColumn *>;
|
||||
|
||||
template <typename ... Args>
|
||||
struct IsMutableColumns;
|
||||
|
||||
template <typename Arg, typename ... Args>
|
||||
struct IsMutableColumns<Arg, Args ...>
|
||||
{
|
||||
static const bool value = std::is_assignable<MutableColumnPtr &&, Arg>::value && IsMutableColumns<Args ...>::value;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct IsMutableColumns<> { static const bool value = true; };
|
||||
|
||||
}
|
||||
|
@ -74,12 +74,12 @@ public:
|
||||
s += length;
|
||||
}
|
||||
|
||||
MutableColumnPtr filter(const Filter & filt, ssize_t /*result_size_hint*/) const override
|
||||
ColumnPtr filter(const Filter & filt, ssize_t /*result_size_hint*/) const override
|
||||
{
|
||||
return cloneDummy(countBytesInFilter(filt));
|
||||
}
|
||||
|
||||
MutableColumnPtr permute(const Permutation & perm, size_t limit) const override
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override
|
||||
{
|
||||
if (s != perm.size())
|
||||
throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
@ -94,7 +94,7 @@ public:
|
||||
res[i] = i;
|
||||
}
|
||||
|
||||
MutableColumnPtr replicate(const Offsets & offsets) const override
|
||||
ColumnPtr replicate(const Offsets & offsets) const override
|
||||
{
|
||||
if (s != offsets.size())
|
||||
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__))
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
@ -68,10 +68,9 @@
|
||||
* of this shared state.
|
||||
*
|
||||
* Caveats:
|
||||
* - after a call to 'mutate' method, you can still have a reference to immutable ptr somewhere
|
||||
* and it can still become shared. Also it would be better to make 'mutate' method rvalue-qualified.
|
||||
* - after a call to 'mutate' method, you can still have a reference to immutable ptr somewhere.
|
||||
* - as 'mutable_ptr' should be unique, it's refcount is redundant - probably it would be better
|
||||
* to use std::unique_ptr for it, but see above.
|
||||
* to use std::unique_ptr for it somehow.
|
||||
*/
|
||||
template <typename Derived>
|
||||
class COWPtr : public boost::intrusive_ref_counter<Derived>
|
||||
@ -80,12 +79,22 @@ private:
|
||||
Derived * derived() { return static_cast<Derived *>(this); }
|
||||
const Derived * derived() const { return static_cast<const Derived *>(this); }
|
||||
|
||||
template <typename T>
|
||||
class IntrusivePtr : public boost::intrusive_ptr<T>
|
||||
{
|
||||
public:
|
||||
using boost::intrusive_ptr<T>::intrusive_ptr;
|
||||
|
||||
T & operator*() const & { return boost::intrusive_ptr<T>::operator*(); }
|
||||
T && operator*() const && { return const_cast<typename std::remove_const<T>::type &&>(*boost::intrusive_ptr<T>::get()); }
|
||||
};
|
||||
|
||||
protected:
|
||||
template <typename T>
|
||||
class mutable_ptr : public boost::intrusive_ptr<T>
|
||||
class mutable_ptr : public IntrusivePtr<T>
|
||||
{
|
||||
private:
|
||||
using Base = boost::intrusive_ptr<T>;
|
||||
using Base = IntrusivePtr<T>;
|
||||
|
||||
template <typename> friend class COWPtr;
|
||||
template <typename, typename> friend class COWPtrHelper;
|
||||
@ -114,10 +123,10 @@ public:
|
||||
|
||||
protected:
|
||||
template <typename T>
|
||||
class immutable_ptr : public boost::intrusive_ptr<const T>
|
||||
class immutable_ptr : public IntrusivePtr<const T>
|
||||
{
|
||||
private:
|
||||
using Base = boost::intrusive_ptr<const T>;
|
||||
using Base = IntrusivePtr<const T>;
|
||||
|
||||
template <typename> friend class COWPtr;
|
||||
template <typename, typename> friend class COWPtrHelper;
|
||||
|
@ -1,13 +1,12 @@
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <dlfcn.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <IO/WriteBufferFromVector.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <port/unistd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -3,6 +3,9 @@
|
||||
#include <Core/Types.h>
|
||||
#include <Common/BitHelpers.h>
|
||||
|
||||
#if __SSE2__
|
||||
#include <emmintrin.h>
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -49,9 +52,29 @@ inline size_t seqLength(const UInt8 first_octet)
|
||||
inline size_t countCodePoints(const UInt8 * data, size_t size)
|
||||
{
|
||||
size_t res = 0;
|
||||
const auto end = data + size;
|
||||
|
||||
/// TODO SIMD implementation looks quite simple.
|
||||
for (auto end = data + size; data < end; ++data) /// Skip UTF-8 continuation bytes.
|
||||
#if __SSE2__
|
||||
const auto bytes_sse = sizeof(__m128i);
|
||||
const auto src_end_sse = (data + size) - (size % bytes_sse);
|
||||
|
||||
const auto align_sse = _mm_set1_epi8(0x40);
|
||||
const auto upper_bound = _mm_set1_epi8(0xBF);
|
||||
|
||||
for (; data < src_end_sse; data += bytes_sse)
|
||||
{
|
||||
const auto chars = _mm_loadu_si128(reinterpret_cast<const __m128i *>(data));
|
||||
|
||||
///Align to zero for the solve two case
|
||||
const auto align_res = _mm_adds_epu8(chars, align_sse);
|
||||
const auto less_than_and_equals = _mm_cmpeq_epi8(_mm_min_epu8(align_res, upper_bound), align_res);
|
||||
|
||||
res += __builtin_popcount(_mm_movemask_epi8(less_than_and_equals));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
for (; data < end; ++data) /// Skip UTF-8 continuation bytes.
|
||||
res += (*data <= 0x7F || *data >= 0xC0);
|
||||
|
||||
return res;
|
||||
|
@ -5,7 +5,11 @@ add_headers_and_sources(clickhouse_common_zookeeper .)
|
||||
|
||||
add_library(clickhouse_common_zookeeper ${SPLIT_SHARED} ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
|
||||
|
||||
target_link_libraries (clickhouse_common_zookeeper clickhouse_common_io)
|
||||
if (NOT USE_INTERNAL_ZOOKEEPER_LIBRARY)
|
||||
target_include_directories (clickhouse_common_zookeeper BEFORE PUBLIC ${ZOOKEEPER_INCLUDE_DIR})
|
||||
endif ()
|
||||
|
||||
target_link_libraries (clickhouse_common_zookeeper clickhouse_common_io ${ZOOKEEPER_LIBRARY})
|
||||
|
||||
if (ENABLE_TESTS)
|
||||
add_subdirectory (tests)
|
||||
|
@ -8,10 +8,10 @@
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <port/unistd.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <iostream>
|
||||
#include <unistd.h>
|
||||
#include <port/unistd.h>
|
||||
|
||||
using namespace zkutil;
|
||||
|
||||
|
@ -1,13 +1,10 @@
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <Poco/File.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <port/unistd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -1,12 +1,10 @@
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <port/unistd.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Core/Types.h>
|
||||
|
||||
#ifdef __APPLE__
|
||||
#include <common/apple_rt.h>
|
||||
#endif
|
||||
|
@ -7,15 +7,13 @@
|
||||
#include <Common/CompactArray.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <stdexcept>
|
||||
#include <unistd.h>
|
||||
#include <cstdlib>
|
||||
#include <port/unistd.h>
|
||||
|
||||
namespace fs = boost::filesystem;
|
||||
|
||||
|
@ -312,7 +312,7 @@ MutableColumns Block::mutateColumns() const
|
||||
size_t num_columns = data.size();
|
||||
MutableColumns columns(num_columns);
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
columns[i] = data[i].column ? data[i].column->mutate() : data[i].type->createColumn();
|
||||
columns[i] = data[i].column ? (*std::move(data[i].column)).mutate() : data[i].type->createColumn();
|
||||
return columns;
|
||||
}
|
||||
|
||||
|
@ -109,7 +109,7 @@ public:
|
||||
/** Get empty columns with the same types as in block. */
|
||||
MutableColumns cloneEmptyColumns() const;
|
||||
|
||||
/** Get columns from block for mutation. */
|
||||
/** Get columns from block for mutation. Columns in block will be nullptr. */
|
||||
MutableColumns mutateColumns() const;
|
||||
|
||||
/** Replace columns in a block */
|
||||
|
@ -60,6 +60,7 @@
|
||||
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
|
||||
#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226
|
||||
#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337
|
||||
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
|
||||
|
||||
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
|
||||
#define DBMS_TCP_PROTOCOL_VERSION 54226
|
||||
@ -74,8 +75,18 @@
|
||||
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
|
||||
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
|
||||
|
||||
#define ALWAYS_INLINE __attribute__((__always_inline__))
|
||||
#define NO_INLINE __attribute__((__noinline__))
|
||||
// more aliases: https://mailman.videolan.org/pipermail/x264-devel/2014-May/010660.html
|
||||
|
||||
#if defined(_MSC_VER)
|
||||
#define ALWAYS_INLINE __forceinline
|
||||
#define NO_INLINE static __declspec(noinline)
|
||||
#define MAY_ALIAS
|
||||
#else
|
||||
#define ALWAYS_INLINE __attribute__((__always_inline__))
|
||||
#define NO_INLINE __attribute__((__noinline__))
|
||||
#define MAY_ALIAS __attribute__((__may_alias__))
|
||||
#endif
|
||||
|
||||
|
||||
#define PLATFORM_NOT_SUPPORTED "The only supported platforms are x86_64 and AArch64 (work in progress)"
|
||||
|
||||
|
@ -198,14 +198,14 @@ public:
|
||||
template <typename T> T & get()
|
||||
{
|
||||
using TWithoutRef = std::remove_reference_t<T>;
|
||||
TWithoutRef * __attribute__((__may_alias__)) ptr = reinterpret_cast<TWithoutRef*>(&storage);
|
||||
TWithoutRef * MAY_ALIAS ptr = reinterpret_cast<TWithoutRef*>(&storage);
|
||||
return *ptr;
|
||||
};
|
||||
|
||||
template <typename T> const T & get() const
|
||||
{
|
||||
using TWithoutRef = std::remove_reference_t<T>;
|
||||
const TWithoutRef * __attribute__((__may_alias__)) ptr = reinterpret_cast<const TWithoutRef*>(&storage);
|
||||
const TWithoutRef * MAY_ALIAS ptr = reinterpret_cast<const TWithoutRef*>(&storage);
|
||||
return *ptr;
|
||||
};
|
||||
|
||||
@ -340,7 +340,7 @@ private:
|
||||
void createConcrete(T && x)
|
||||
{
|
||||
using JustT = std::decay_t<T>;
|
||||
JustT * __attribute__((__may_alias__)) ptr = reinterpret_cast<JustT *>(&storage);
|
||||
JustT * MAY_ALIAS ptr = reinterpret_cast<JustT *>(&storage);
|
||||
new (ptr) JustT(std::forward<T>(x));
|
||||
which = TypeToEnum<JustT>::value;
|
||||
}
|
||||
@ -350,7 +350,7 @@ private:
|
||||
void assignConcrete(T && x)
|
||||
{
|
||||
using JustT = std::decay_t<T>;
|
||||
JustT * __attribute__((__may_alias__)) ptr = reinterpret_cast<JustT *>(&storage);
|
||||
JustT * MAY_ALIAS ptr = reinterpret_cast<JustT *>(&storage);
|
||||
*ptr = std::forward<T>(x);
|
||||
}
|
||||
|
||||
@ -398,7 +398,7 @@ private:
|
||||
|
||||
void create(const char * data, size_t size)
|
||||
{
|
||||
String * __attribute__((__may_alias__)) ptr = reinterpret_cast<String*>(&storage);
|
||||
String * MAY_ALIAS ptr = reinterpret_cast<String*>(&storage);
|
||||
new (ptr) String(data, size);
|
||||
which = Types::String;
|
||||
}
|
||||
@ -434,7 +434,7 @@ private:
|
||||
template <typename T>
|
||||
void destroy()
|
||||
{
|
||||
T * __attribute__((__may_alias__)) ptr = reinterpret_cast<T*>(&storage);
|
||||
T * MAY_ALIAS ptr = reinterpret_cast<T*>(&storage);
|
||||
ptr->~T();
|
||||
}
|
||||
};
|
||||
|
@ -76,7 +76,7 @@ Block ColumnGathererStream::readImpl()
|
||||
return Block();
|
||||
|
||||
output_block = Block{column.cloneEmpty()};
|
||||
MutableColumnPtr output_column = output_block.getByPosition(0).column->mutate();
|
||||
MutableColumnPtr output_column = output_block.getByPosition(0).column->assumeMutable();
|
||||
output_column->gather(*this);
|
||||
if (!output_column->empty())
|
||||
output_block.getByPosition(0).column = std::move(output_column);
|
||||
|
@ -221,7 +221,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
|
||||
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i] = source_blocks[source_num]->getByPosition(i).column->mutate();
|
||||
merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate();
|
||||
|
||||
// std::cerr << "copied columns\n";
|
||||
|
||||
@ -233,7 +233,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
auto & column = merged_columns[i];
|
||||
column = column->cut(0, merged_rows);
|
||||
column = (*column->cut(0, merged_rows)).mutate();
|
||||
}
|
||||
|
||||
cancel(false);
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include <sys/ioctl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <port/unistd.h>
|
||||
#include <DataStreams/PrettyBlockOutputStream.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <DataStreams/PushingToViewsBlockOutputStream.h>
|
||||
#include <DataStreams/SquashingBlockInputStream.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
|
||||
|
||||
@ -35,7 +36,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
|
||||
auto & materialized_view = dynamic_cast<const StorageMaterializedView &>(*dependent_table);
|
||||
|
||||
auto query = materialized_view.getInnerQuery();
|
||||
auto out = std::make_shared<PushingToViewsBlockOutputStream>(
|
||||
BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>(
|
||||
database_table.first, database_table.second, dependent_table, *views_context, ASTPtr());
|
||||
views.emplace_back(ViewInfo{std::move(query), database_table.first, database_table.second, std::move(out)});
|
||||
}
|
||||
@ -66,8 +67,19 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
BlockInputStreamPtr from = std::make_shared<OneBlockInputStream>(block);
|
||||
InterpreterSelectQuery select(view.query, *views_context, {}, QueryProcessingStage::Complete, 0, from);
|
||||
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
|
||||
copyData(*data, *view.out);
|
||||
BlockInputStreamPtr in = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
|
||||
/// Squashing is needed here because the materialized view query can generate a lot of blocks
|
||||
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
|
||||
/// and two-level aggregation is triggered).
|
||||
in = std::make_shared<SquashingBlockInputStream>(
|
||||
in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
|
||||
|
||||
in->readPrefix();
|
||||
|
||||
while (Block result_block = in->read())
|
||||
view.out->write(result_block);
|
||||
|
||||
in->readSuffix();
|
||||
}
|
||||
catch (Exception & ex)
|
||||
{
|
||||
|
@ -29,18 +29,27 @@ public:
|
||||
{
|
||||
if (output)
|
||||
output->flush();
|
||||
|
||||
for (auto & view : views)
|
||||
view.out->flush();
|
||||
}
|
||||
|
||||
void writePrefix() override
|
||||
{
|
||||
if (output)
|
||||
output->writePrefix();
|
||||
|
||||
for (auto & view : views)
|
||||
view.out->writePrefix();
|
||||
}
|
||||
|
||||
void writeSuffix() override
|
||||
{
|
||||
if (output)
|
||||
output->writeSuffix();
|
||||
|
||||
for (auto & view : views)
|
||||
view.out->writeSuffix();
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -62,7 +62,7 @@ void SquashingTransform::append(Block && block)
|
||||
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
{
|
||||
MutableColumnPtr mutable_column = accumulated_block.getByPosition(i).column->mutate();
|
||||
MutableColumnPtr mutable_column = (*std::move(accumulated_block.getByPosition(i).column)).mutate();
|
||||
mutable_column->insertRangeFrom(*block.getByPosition(i).column, 0, rows);
|
||||
accumulated_block.getByPosition(i).column = std::move(mutable_column);
|
||||
}
|
||||
|
@ -259,11 +259,11 @@ Block SummingSortedBlockInputStream::readImpl()
|
||||
if (checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
|
||||
{
|
||||
size_t tuple_size = desc.column_numbers.size();
|
||||
Columns tuple_columns(tuple_size);
|
||||
MutableColumns tuple_columns(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
tuple_columns[i] = header.safeGetByPosition(desc.column_numbers[i]).column;
|
||||
tuple_columns[i] = header.safeGetByPosition(desc.column_numbers[i]).column->assumeMutable();
|
||||
|
||||
desc.merged_column = ColumnTuple::create(tuple_columns);
|
||||
desc.merged_column = ColumnTuple::create(std::move(tuple_columns));
|
||||
}
|
||||
else
|
||||
desc.merged_column = header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty();
|
||||
|
@ -329,10 +329,10 @@ void DataTypeTuple::deserializeBinaryBulkWithMultipleStreams(
|
||||
MutableColumnPtr DataTypeTuple::createColumn() const
|
||||
{
|
||||
size_t size = elems.size();
|
||||
Columns tuple_columns(size);
|
||||
MutableColumns tuple_columns(size);
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
tuple_columns[i] = elems[i]->createColumn();
|
||||
return ColumnTuple::create(tuple_columns);
|
||||
return ColumnTuple::create(std::move(tuple_columns));
|
||||
}
|
||||
|
||||
Field DataTypeTuple::getDefault() const
|
||||
|
@ -3,15 +3,21 @@
|
||||
#include <Interpreters/ExternalDictionaries.h>
|
||||
#include <Storages/StorageDictionary.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Parsers/IAST.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TABLE_ALREADY_EXISTS;
|
||||
extern const int UNKNOWN_TABLE;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
|
||||
extern const int TABLE_ALREADY_EXISTS;
|
||||
extern const int UNKNOWN_TABLE;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
|
||||
extern const int SYNTAX_ERROR;
|
||||
}
|
||||
|
||||
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & context)
|
||||
@ -153,11 +159,54 @@ time_t DatabaseDictionary::getTableMetadataModificationTime(
|
||||
return static_cast<time_t>(0);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseDictionary::getCreateQuery(
|
||||
const Context &,
|
||||
const String &) const
|
||||
ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const Context & context,
|
||||
const String & table_name, bool throw_on_error) const
|
||||
{
|
||||
throw Exception("There is no CREATE TABLE query for DatabaseDictionary tables", ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
|
||||
String query;
|
||||
{
|
||||
WriteBufferFromString buffer(query);
|
||||
|
||||
const auto & dictionaries = context.getExternalDictionaries();
|
||||
auto dictionary = throw_on_error ? dictionaries.getDictionary(table_name)
|
||||
: dictionaries.tryGetDictionary(table_name);
|
||||
|
||||
auto names_and_types = StorageDictionary::getNamesAndTypes(dictionary->getStructure());
|
||||
buffer << "CREATE TABLE " << backQuoteIfNeed(name) << '.' << backQuoteIfNeed(table_name) << " (";
|
||||
buffer << StorageDictionary::generateNamesAndTypesDescription(names_and_types.begin(), names_and_types.end());
|
||||
buffer << ") Engine = Dictionary(" << backQuoteIfNeed(table_name) << ")";
|
||||
}
|
||||
|
||||
ParserCreateQuery parser;
|
||||
const char * pos = query.data();
|
||||
std::string error_message;
|
||||
auto ast = tryParseQuery(parser, pos, pos + query.size(), error_message,
|
||||
/* hilite = */ false, "", /* allow_multi_statements = */ false);
|
||||
|
||||
if (!ast && throw_on_error)
|
||||
throw Exception(error_message, ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseDictionary::getCreateTableQuery(const Context & context, const String & table_name) const
|
||||
{
|
||||
return getCreateTableQueryImpl(context, table_name, true);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseDictionary::tryGetCreateTableQuery(const Context & context, const String & table_name) const
|
||||
{
|
||||
return getCreateTableQueryImpl(context, table_name, false);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseDictionary::getCreateDatabaseQuery(const Context & /*context*/) const
|
||||
{
|
||||
String query;
|
||||
{
|
||||
WriteBufferFromString buffer(query);
|
||||
buffer << "CREATE DATABASE " << backQuoteIfNeed(name) << " ENGINE = Dictionary";
|
||||
}
|
||||
ParserCreateQuery parser;
|
||||
return parseQuery(parser, query.data(), query.data() + query.size(), "");
|
||||
}
|
||||
|
||||
void DatabaseDictionary::shutdown()
|
||||
|
@ -22,16 +22,6 @@ class ExternalDictionaries;
|
||||
*/
|
||||
class DatabaseDictionary : public IDatabase
|
||||
{
|
||||
private:
|
||||
const String name;
|
||||
mutable std::mutex mutex;
|
||||
const ExternalDictionaries & external_dictionaries;
|
||||
std::unordered_set<String> deleted_tables;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
Tables loadTables();
|
||||
|
||||
public:
|
||||
DatabaseDictionary(const String & name_, const Context & context);
|
||||
|
||||
@ -86,12 +76,30 @@ public:
|
||||
const Context & context,
|
||||
const String & table_name) override;
|
||||
|
||||
ASTPtr getCreateQuery(
|
||||
ASTPtr getCreateTableQuery(
|
||||
const Context & context,
|
||||
const String & table_name) const override;
|
||||
|
||||
ASTPtr tryGetCreateTableQuery(
|
||||
const Context & context,
|
||||
const String & table_name) const override;
|
||||
|
||||
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
|
||||
|
||||
void shutdown() override;
|
||||
void drop() override;
|
||||
|
||||
private:
|
||||
const String name;
|
||||
mutable std::mutex mutex;
|
||||
const ExternalDictionaries & external_dictionaries;
|
||||
std::unordered_set<String> deleted_tables;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
Tables loadTables();
|
||||
|
||||
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -118,13 +118,18 @@ time_t DatabaseMemory::getTableMetadataModificationTime(
|
||||
return static_cast<time_t>(0);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseMemory::getCreateQuery(
|
||||
ASTPtr DatabaseMemory::getCreateTableQuery(
|
||||
const Context &,
|
||||
const String &) const
|
||||
{
|
||||
throw Exception("There is no CREATE TABLE query for DatabaseMemory tables", ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseMemory::getCreateDatabaseQuery(const Context &) const
|
||||
{
|
||||
throw Exception("There is no CREATE DATABASE query for DatabaseMemory", ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
|
||||
}
|
||||
|
||||
void DatabaseMemory::shutdown()
|
||||
{
|
||||
/// You can not hold a lock during shutdown.
|
||||
|
@ -77,9 +77,10 @@ public:
|
||||
const Context & context,
|
||||
const String & table_name) override;
|
||||
|
||||
ASTPtr getCreateQuery(
|
||||
const Context & context,
|
||||
const String & table_name) const override;
|
||||
ASTPtr getCreateTableQuery(const Context & context, const String & table_name) const override;
|
||||
ASTPtr tryGetCreateTableQuery(const Context &, const String &) const override { return nullptr; }
|
||||
|
||||
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
|
||||
|
||||
void shutdown() override;
|
||||
void drop() override;
|
||||
|
@ -31,6 +31,7 @@ namespace ErrorCodes
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
|
||||
extern const int SYNTAX_ERROR;
|
||||
}
|
||||
|
||||
|
||||
@ -45,6 +46,12 @@ namespace detail
|
||||
{
|
||||
return base_path + (endsWith(base_path, "/") ? "" : "/") + escapeForFileName(table_name) + ".sql";
|
||||
}
|
||||
|
||||
String getDatabaseMetadataPath(const String & base_path)
|
||||
{
|
||||
return (endsWith(base_path, "/") ? base_path.substr(0, base_path.size() - 1) : base_path) + ".sql";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void loadTable(
|
||||
@ -329,19 +336,42 @@ void DatabaseOrdinary::removeTable(
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static ASTPtr getCreateQueryImpl(const String & path, const String & table_name)
|
||||
static ASTPtr getQueryFromMetadata(const String & metadata_path, bool throw_on_error = true)
|
||||
{
|
||||
String table_metadata_path = detail::getTableMetadataPath(path, table_name);
|
||||
if (!Poco::File(metadata_path).exists())
|
||||
return nullptr;
|
||||
|
||||
String query;
|
||||
|
||||
{
|
||||
ReadBufferFromFile in(table_metadata_path, 4096);
|
||||
ReadBufferFromFile in(metadata_path, 4096);
|
||||
readStringUntilEOF(query, in);
|
||||
}
|
||||
|
||||
ParserCreateQuery parser;
|
||||
return parseQuery(parser, query.data(), query.data() + query.size(), "in file " + table_metadata_path);
|
||||
const char * pos = query.data();
|
||||
std::string error_message;
|
||||
auto ast = tryParseQuery(parser, pos, pos + query.size(), error_message, /* hilite = */ false,
|
||||
"in file " + metadata_path, /* allow_multi_statements = */ false);
|
||||
|
||||
if (!ast && throw_on_error)
|
||||
throw Exception(error_message, ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
||||
static ASTPtr getCreateQueryFromMetadata(const String & metadata_path, const String & database, bool throw_on_error)
|
||||
{
|
||||
ASTPtr ast = getQueryFromMetadata(metadata_path, throw_on_error);
|
||||
|
||||
if (ast)
|
||||
{
|
||||
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
|
||||
ast_create_query.attach = false;
|
||||
ast_create_query.database = database;
|
||||
}
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
||||
|
||||
@ -378,7 +408,9 @@ void DatabaseOrdinary::renameTable(
|
||||
throw Exception{e};
|
||||
}
|
||||
|
||||
ASTPtr ast = getCreateQueryImpl(metadata_path, table_name);
|
||||
ASTPtr ast = getQueryFromMetadata(detail::getTableMetadataPath(metadata_path, table_name));
|
||||
if (!ast)
|
||||
throw Exception("There is no metadata file for table " + table_name, ErrorCodes::FILE_DOESNT_EXIST);
|
||||
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
|
||||
ast_create_query.table = to_table_name;
|
||||
|
||||
@ -405,28 +437,51 @@ time_t DatabaseOrdinary::getTableMetadataModificationTime(
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ASTPtr DatabaseOrdinary::getCreateQuery(
|
||||
const Context & context,
|
||||
const String & table_name) const
|
||||
ASTPtr DatabaseOrdinary::getCreateTableQueryImpl(const Context & context,
|
||||
const String & table_name, bool throw_on_error) const
|
||||
{
|
||||
ASTPtr ast;
|
||||
try
|
||||
{
|
||||
ast = getCreateQueryImpl(metadata_path, table_name);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// Handle system.* tables for which there are no table.sql files
|
||||
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST && tryGetTable(context, table_name) != nullptr)
|
||||
throw Exception("There is no CREATE TABLE query for table " + table_name, ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
|
||||
|
||||
throw;
|
||||
auto table_metadata_path = detail::getTableMetadataPath(metadata_path, table_name);
|
||||
ast = getCreateQueryFromMetadata(table_metadata_path, name, throw_on_error);
|
||||
if (!ast && throw_on_error)
|
||||
{
|
||||
/// Handle system.* tables for which there are no table.sql files.
|
||||
bool has_table = tryGetTable(context, table_name) != nullptr;
|
||||
|
||||
auto msg = has_table
|
||||
? "There is no CREATE TABLE query for table "
|
||||
: "There is no metadata file for table ";
|
||||
|
||||
throw Exception(msg + table_name, ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
|
||||
}
|
||||
|
||||
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
|
||||
ast_create_query.attach = false;
|
||||
ast_create_query.database = name;
|
||||
return ast;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOrdinary::getCreateTableQuery(const Context & context, const String & table_name) const
|
||||
{
|
||||
return getCreateTableQueryImpl(context, table_name, true);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOrdinary::tryGetCreateTableQuery(const Context & context, const String & table_name) const
|
||||
{
|
||||
return getCreateTableQueryImpl(context, table_name, false);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOrdinary::getCreateDatabaseQuery(const Context & /*context*/) const
|
||||
{
|
||||
ASTPtr ast;
|
||||
|
||||
auto database_metadata_path = detail::getDatabaseMetadataPath(metadata_path);
|
||||
ast = getCreateQueryFromMetadata(database_metadata_path, name, true);
|
||||
if (!ast)
|
||||
{
|
||||
/// Handle databases (such as default) for which there are no database.sql files.
|
||||
String query = "CREATE DATABASE " + backQuoteIfNeed(name) + " ENGINE = Ordinary";
|
||||
ParserCreateQuery parser;
|
||||
ast = parseQuery(parser, query.data(), query.data() + query.size(), "");
|
||||
}
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
@ -52,10 +52,16 @@ public:
|
||||
const Context & context,
|
||||
const String & table_name) override;
|
||||
|
||||
ASTPtr getCreateQuery(
|
||||
ASTPtr getCreateTableQuery(
|
||||
const Context & context,
|
||||
const String & table_name) const override;
|
||||
|
||||
ASTPtr tryGetCreateTableQuery(
|
||||
const Context & context,
|
||||
const String & table_name) const override;
|
||||
|
||||
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
|
||||
|
||||
String getDataPath() const override;
|
||||
String getMetadataPath() const override;
|
||||
String getTableMetadataPath(const String & table_name) const override;
|
||||
@ -65,6 +71,8 @@ public:
|
||||
|
||||
private:
|
||||
void startupTables(ThreadPool * thread_pool);
|
||||
|
||||
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -122,9 +122,15 @@ public:
|
||||
const String & name) = 0;
|
||||
|
||||
/// Get the CREATE TABLE query for the table. It can also provide information for detached tables for which there is metadata.
|
||||
virtual ASTPtr getCreateQuery(
|
||||
const Context & context,
|
||||
const String & name) const = 0;
|
||||
virtual ASTPtr tryGetCreateTableQuery(const Context & context, const String & name) const = 0;
|
||||
|
||||
virtual ASTPtr getCreateTableQuery(const Context & context, const String & name) const
|
||||
{
|
||||
return tryGetCreateTableQuery(context, name);
|
||||
}
|
||||
|
||||
/// Get the CREATE DATABASE query for current database.
|
||||
virtual ASTPtr getCreateDatabaseQuery(const Context & context) const = 0;
|
||||
|
||||
/// Returns path for persistent data storage if the database supports it, empty string otherwise
|
||||
virtual String getDataPath() const { return {}; }
|
||||
|
@ -289,7 +289,7 @@ void ComplexKeyHashedDictionary::updateData()
|
||||
for (const auto attribute_idx : ext::range(0, keys_size + attributes_size))
|
||||
{
|
||||
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
|
||||
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
|
||||
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->assumeMutable();
|
||||
saved_column->insertRangeFrom(update_column, 0, update_column.size());
|
||||
}
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ void ExternalResultDescription::init(const Block & sample_block_)
|
||||
/// If default value for column was not provided, use default from data type.
|
||||
if (sample_columns.back()->empty())
|
||||
{
|
||||
MutableColumnPtr mutable_column = sample_columns.back()->mutate();
|
||||
MutableColumnPtr mutable_column = (*std::move(sample_columns.back())).mutate();
|
||||
column.type->insertDefaultInto(*mutable_column);
|
||||
sample_columns.back() = std::move(mutable_column);
|
||||
}
|
||||
|
@ -316,7 +316,7 @@ void FlatDictionary::updateData()
|
||||
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
|
||||
{
|
||||
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
|
||||
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
|
||||
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->assumeMutable();
|
||||
saved_column->insertRangeFrom(update_column, 0, update_column.size());
|
||||
}
|
||||
}
|
||||
|
@ -310,7 +310,7 @@ void HashedDictionary::updateData()
|
||||
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
|
||||
{
|
||||
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
|
||||
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
|
||||
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->assumeMutable();
|
||||
saved_column->insertRangeFrom(update_column, 0, update_column.size());
|
||||
}
|
||||
}
|
||||
|
@ -592,7 +592,7 @@ private:
|
||||
const ColumnPtr & result_column = temporary_block.getByPosition(3).column;
|
||||
if (result_column->isColumnNullable())
|
||||
{
|
||||
MutableColumnPtr mutable_result_column = result_column->mutate();
|
||||
MutableColumnPtr mutable_result_column = (*std::move(result_column)).mutate();
|
||||
static_cast<ColumnNullable &>(*mutable_result_column).applyNullMap(static_cast<const ColumnNullable &>(*arg_cond.column));
|
||||
block.getByPosition(result).column = std::move(mutable_result_column);
|
||||
return true;
|
||||
@ -744,7 +744,8 @@ private:
|
||||
{
|
||||
if (arg_else.column->isColumnNullable())
|
||||
{
|
||||
auto result_column = arg_else.column->mutate();
|
||||
auto arg_else_column = arg_else.column;
|
||||
auto result_column = (*std::move(arg_else_column)).mutate();
|
||||
static_cast<ColumnNullable &>(*result_column).applyNullMap(static_cast<const ColumnUInt8 &>(*arg_cond.column));
|
||||
block.getByPosition(result).column = std::move(result_column);
|
||||
}
|
||||
@ -785,7 +786,8 @@ private:
|
||||
|
||||
if (arg_then.column->isColumnNullable())
|
||||
{
|
||||
auto result_column = arg_then.column->mutate();
|
||||
auto arg_then_column = arg_then.column;
|
||||
auto result_column = (*std::move(arg_then_column)).mutate();
|
||||
static_cast<ColumnNullable &>(*result_column).applyNegatedNullMap(static_cast<const ColumnUInt8 &>(*arg_cond.column));
|
||||
block.getByPosition(result).column = std::move(result_column);
|
||||
}
|
||||
|
@ -936,7 +936,7 @@ public:
|
||||
}
|
||||
|
||||
/// Put all the necessary columns multiplied by the sizes of arrays into the block.
|
||||
auto replicated_column_function_ptr = column_function->replicate(column_first_array->getOffsets());
|
||||
auto replicated_column_function_ptr = (*column_function->replicate(column_first_array->getOffsets())).mutate();
|
||||
auto * replicated_column_function = typeid_cast<ColumnFunction *>(replicated_column_function_ptr.get());
|
||||
replicated_column_function->appendArguments(arrays);
|
||||
|
||||
|
@ -41,7 +41,8 @@ std::unique_ptr<IArraySink> createArraySink(ColumnArray & col, size_t column_siz
|
||||
using Creator = ApplyTypeListForClass<ArraySinkCreator, TypeListNumbers>::Type;
|
||||
if (auto column_nullable = typeid_cast<ColumnNullable *>(&col.getData()))
|
||||
{
|
||||
auto column = ColumnArray::create(column_nullable->getNestedColumnPtr(), col.getOffsetsPtr());
|
||||
auto column = ColumnArray::create(column_nullable->getNestedColumnPtr()->assumeMutable(),
|
||||
col.getOffsetsPtr()->assumeMutable());
|
||||
return Creator::create(*column, &column_nullable->getNullMapData(), column_size);
|
||||
}
|
||||
return Creator::create(col, nullptr, column_size);
|
||||
|
@ -63,7 +63,7 @@ ColumnPtr wrapInNullable(const ColumnPtr & src, Block & block, const ColumnNumbe
|
||||
}
|
||||
else
|
||||
{
|
||||
MutableColumnPtr mutable_result_null_map_column = result_null_map_column->mutate();
|
||||
MutableColumnPtr mutable_result_null_map_column = (*std::move(result_null_map_column)).mutate();
|
||||
|
||||
NullMap & result_null_map = static_cast<ColumnUInt8 &>(*mutable_result_null_map_column).getData();
|
||||
const NullMap & src_null_map = static_cast<const ColumnUInt8 &>(*null_map_column).getData();
|
||||
|
@ -1,4 +1,4 @@
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__))
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
|
||||
|
||||
#include <IO/ReadBufferAIO.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__))
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
|
||||
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
@ -8,7 +8,6 @@
|
||||
#include <Core/Defines.h>
|
||||
#include <Common/AIO.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
#include <string>
|
||||
#include <limits>
|
||||
#include <unistd.h>
|
||||
|
@ -1,14 +1,10 @@
|
||||
#include <errno.h>
|
||||
#include <time.h>
|
||||
|
||||
#include <optional>
|
||||
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include <port/unistd.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
|
||||
|
@ -52,6 +52,8 @@ inline char parseEscapeSequence(char c)
|
||||
return '\a';
|
||||
case 'b':
|
||||
return '\b';
|
||||
case 'e':
|
||||
return '\x1B'; /// \e escape sequence is non standard for C and C++ but supported by gcc and clang.
|
||||
case 'f':
|
||||
return '\f';
|
||||
case 'n':
|
||||
|
@ -1,4 +1,4 @@
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__))
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
|
||||
|
||||
#include <IO/WriteBufferAIO.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__))
|
||||
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
|
||||
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include <unistd.h>
|
||||
#include <port/unistd.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include <IO/createReadBufferFromFileBase.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__)
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
|
||||
#include <IO/ReadBufferAIO.h>
|
||||
#endif
|
||||
#include <Common/ProfileEvents.h>
|
||||
@ -14,7 +14,7 @@ namespace ProfileEvents
|
||||
|
||||
namespace DB
|
||||
{
|
||||
#if defined(__APPLE__) || defined(__FreeBSD__)
|
||||
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER)
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
@ -31,7 +31,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::
|
||||
}
|
||||
else
|
||||
{
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__)
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
|
||||
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
|
||||
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
|
||||
#else
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include <IO/createWriteBufferFromFileBase.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__)
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
|
||||
#include <IO/WriteBufferAIO.h>
|
||||
#endif
|
||||
#include <Common/ProfileEvents.h>
|
||||
@ -15,7 +15,7 @@ namespace ProfileEvents
|
||||
namespace DB
|
||||
{
|
||||
|
||||
#if defined(__APPLE__) || defined(__FreeBSD__)
|
||||
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER)
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
@ -33,7 +33,7 @@ WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & file
|
||||
}
|
||||
else
|
||||
{
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__)
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
|
||||
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
|
||||
return new WriteBufferAIO(filename_, buffer_size_, flags_, mode, existing_memory_);
|
||||
#else
|
||||
|
@ -1,14 +1,13 @@
|
||||
#include <IO/ReadBufferAIO.h>
|
||||
#include <Core/Defines.h>
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include <vector>
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <functional>
|
||||
#include <cstdlib>
|
||||
#include <unistd.h>
|
||||
#include <port/unistd.h>
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
@ -1,5 +1,3 @@
|
||||
#include <unistd.h>
|
||||
|
||||
#include <Interpreters/ClientInfo.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
@ -8,6 +6,7 @@
|
||||
#include <Core/Defines.h>
|
||||
#include <Common/getFQDNOrHostName.h>
|
||||
#include <Common/ClickHouseRevision.h>
|
||||
#include <port/unistd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -82,6 +82,7 @@ namespace ErrorCodes
|
||||
extern const int TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT;
|
||||
extern const int SESSION_NOT_FOUND;
|
||||
extern const int SESSION_IS_LOCKED;
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
|
||||
}
|
||||
|
||||
|
||||
@ -912,17 +913,17 @@ DatabasePtr Context::detachDatabase(const String & database_name)
|
||||
}
|
||||
|
||||
|
||||
ASTPtr Context::getCreateQuery(const String & database_name, const String & table_name) const
|
||||
ASTPtr Context::getCreateTableQuery(const String & database_name, const String & table_name) const
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
||||
String db = resolveDatabase(database_name, current_database);
|
||||
assertDatabaseExists(db);
|
||||
|
||||
return shared->databases[db]->getCreateQuery(*this, table_name);
|
||||
return shared->databases[db]->getCreateTableQuery(*this, table_name);
|
||||
}
|
||||
|
||||
ASTPtr Context::getCreateExternalQuery(const String & table_name) const
|
||||
ASTPtr Context::getCreateExternalTableQuery(const String & table_name) const
|
||||
{
|
||||
TableAndCreateASTs::const_iterator jt = external_tables.find(table_name);
|
||||
if (external_tables.end() == jt)
|
||||
@ -931,6 +932,15 @@ ASTPtr Context::getCreateExternalQuery(const String & table_name) const
|
||||
return jt->second.second;
|
||||
}
|
||||
|
||||
ASTPtr Context::getCreateDatabaseQuery(const String & database_name) const
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
||||
String db = resolveDatabase(database_name, current_database);
|
||||
assertDatabaseExists(db);
|
||||
|
||||
return shared->databases[db]->getCreateDatabaseQuery(*this);
|
||||
}
|
||||
|
||||
Settings Context::getSettings() const
|
||||
{
|
||||
|
@ -241,8 +241,9 @@ public:
|
||||
UInt16 getTCPPort() const;
|
||||
|
||||
/// Get query for the CREATE table.
|
||||
ASTPtr getCreateQuery(const String & database_name, const String & table_name) const;
|
||||
ASTPtr getCreateExternalQuery(const String & table_name) const;
|
||||
ASTPtr getCreateTableQuery(const String & database_name, const String & table_name) const;
|
||||
ASTPtr getCreateExternalTableQuery(const String & table_name) const;
|
||||
ASTPtr getCreateDatabaseQuery(const String & database_name) const;
|
||||
|
||||
const DatabasePtr getDatabase(const String & database_name) const;
|
||||
DatabasePtr getDatabase(const String & database_name);
|
||||
|
@ -31,6 +31,11 @@ public:
|
||||
return std::static_pointer_cast<IDictionaryBase>(getLoadable(name));
|
||||
}
|
||||
|
||||
DictPtr tryGetDictionary(const std::string & name) const
|
||||
{
|
||||
return std::static_pointer_cast<IDictionaryBase>(tryGetLoadable(name));
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
std::unique_ptr<IExternalLoadable> create(const std::string & name, const Configuration & config,
|
||||
|
@ -385,21 +385,35 @@ void ExternalLoader::reload(const std::string & name)
|
||||
throw Exception("Failed to load " + object_name + " '" + name + "' during the reload process", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
ExternalLoader::LoadablePtr ExternalLoader::getLoadable(const std::string & name) const
|
||||
ExternalLoader::LoadablePtr ExternalLoader::getLoadableImpl(const std::string & name, bool throw_on_error) const
|
||||
{
|
||||
const std::lock_guard<std::mutex> lock{map_mutex};
|
||||
|
||||
const auto it = loadable_objects.find(name);
|
||||
if (it == std::end(loadable_objects))
|
||||
throw Exception("No such " + object_name + ": " + name, ErrorCodes::BAD_ARGUMENTS);
|
||||
{
|
||||
if (throw_on_error)
|
||||
throw Exception("No such " + object_name + ": " + name, ErrorCodes::BAD_ARGUMENTS);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!it->second.loadable)
|
||||
it->second.exception ? std::rethrow_exception(it->second.exception) :
|
||||
throw Exception{object_name + " '" + name + "' is not loaded", ErrorCodes::LOGICAL_ERROR};
|
||||
if (!it->second.loadable && throw_on_error)
|
||||
it->second.exception ? std::rethrow_exception(it->second.exception)
|
||||
: throw Exception{object_name + " '" + name + "' is not loaded", ErrorCodes::LOGICAL_ERROR};
|
||||
|
||||
return it->second.loadable;
|
||||
}
|
||||
|
||||
ExternalLoader::LoadablePtr ExternalLoader::getLoadable(const std::string & name) const
|
||||
{
|
||||
return getLoadableImpl(name, true);
|
||||
}
|
||||
|
||||
ExternalLoader::LoadablePtr ExternalLoader::tryGetLoadable(const std::string & name) const
|
||||
{
|
||||
return getLoadableImpl(name, false);
|
||||
}
|
||||
|
||||
ExternalLoader::LockedObjectsMap ExternalLoader::getObjectsMap() const
|
||||
{
|
||||
return LockedObjectsMap(map_mutex, loadable_objects);
|
||||
|
@ -104,6 +104,7 @@ public:
|
||||
void reload(const std::string & name);
|
||||
|
||||
LoadablePtr getLoadable(const std::string & name) const;
|
||||
LoadablePtr tryGetLoadable(const std::string & name) const;
|
||||
|
||||
protected:
|
||||
virtual std::unique_ptr<IExternalLoadable> create(const std::string & name, const Configuration & config,
|
||||
@ -172,6 +173,8 @@ private:
|
||||
void reloadAndUpdate(bool throw_on_error = false);
|
||||
|
||||
void reloadPeriodically();
|
||||
|
||||
LoadablePtr getLoadableImpl(const std::string & name, bool throw_on_error) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -425,7 +425,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
|
||||
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
|
||||
String as_table_name = create.as_table;
|
||||
|
||||
ASTPtr as_create_ptr = context.getCreateQuery(as_database_name, as_table_name);
|
||||
ASTPtr as_create_ptr = context.getCreateTableQuery(as_database_name, as_table_name);
|
||||
const auto & as_create = typeid_cast<const ASTCreateQuery &>(*as_create_ptr);
|
||||
|
||||
if (as_create.is_view)
|
||||
@ -454,7 +454,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
if (create.attach && !create.storage && !create.columns)
|
||||
{
|
||||
// Table SQL definition is available even if the table is detached
|
||||
auto query = context.getCreateQuery(database_name, table_name);
|
||||
auto query = context.getCreateTableQuery(database_name, table_name);
|
||||
auto & as_create = typeid_cast<const ASTCreateQuery &>(*query);
|
||||
create = as_create; // Copy the saved create query, but use ATTACH instead of CREATE
|
||||
create.attach = true;
|
||||
|
@ -107,7 +107,11 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
|
||||
{
|
||||
return std::make_unique<InterpreterExistsQuery>(query, context);
|
||||
}
|
||||
else if (typeid_cast<ASTShowCreateQuery *>(query.get()))
|
||||
else if (typeid_cast<ASTShowCreateTableQuery *>(query.get()))
|
||||
{
|
||||
return std::make_unique<InterpreterShowCreateQuery>(query, context);
|
||||
}
|
||||
else if (typeid_cast<ASTShowCreateDatabaseQuery *>(query.get()))
|
||||
{
|
||||
return std::make_unique<InterpreterShowCreateQuery>(query, context);
|
||||
}
|
||||
|
@ -39,13 +39,18 @@ Block InterpreterShowCreateQuery::getSampleBlock()
|
||||
|
||||
BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
|
||||
{
|
||||
const ASTShowCreateQuery & ast = typeid_cast<const ASTShowCreateQuery &>(*query_ptr);
|
||||
const auto & ast = dynamic_cast<const ASTQueryWithTableAndOutput &>(*query_ptr);
|
||||
|
||||
if (ast.temporary && !ast.database.empty())
|
||||
throw Exception("Temporary databases are not possible.", ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
ASTPtr create_query = (ast.temporary ? context.getCreateExternalQuery(ast.table) :
|
||||
context.getCreateQuery(ast.database, ast.table));
|
||||
ASTPtr create_query;
|
||||
if (ast.temporary)
|
||||
create_query = context.getCreateExternalTableQuery(ast.table);
|
||||
else if (ast.table.empty())
|
||||
create_query = context.getCreateDatabaseQuery(ast.database);
|
||||
else
|
||||
create_query = context.getCreateTableQuery(ast.database, ast.table);
|
||||
|
||||
if (!create_query && ast.temporary)
|
||||
throw Exception("Unable to show the create query of " + ast.table + ". Maybe it was created by the system.", ErrorCodes::THERE_IS_NO_QUERY);
|
||||
|
@ -31,7 +31,7 @@ void extractNestedColumnsAndNullMap(ColumnRawPtrs & key_columns, ColumnPtr & nul
|
||||
}
|
||||
else
|
||||
{
|
||||
MutableColumnPtr mutable_null_map_holder = null_map_holder->mutate();
|
||||
MutableColumnPtr mutable_null_map_holder = (*std::move(null_map_holder)).mutate();
|
||||
|
||||
PaddedPODArray<UInt8> & mutable_null_map = static_cast<ColumnUInt8 &>(*mutable_null_map_holder).getData();
|
||||
const PaddedPODArray<UInt8> & other_null_map = column_nullable.getNullMapData();
|
||||
|
@ -108,7 +108,7 @@ bool PartLog::addNewPartToTheLog(Context & context, const MergeTreeDataPart & pa
|
||||
elem.table_name = part.storage.getTableName();
|
||||
elem.part_name = part.name;
|
||||
|
||||
elem.bytes_compressed_on_disk = part.size_in_bytes;
|
||||
elem.bytes_compressed_on_disk = part.bytes_on_disk;
|
||||
elem.rows = part.rows_count;
|
||||
|
||||
elem.error = static_cast<UInt16>(execution_status.code);
|
||||
|
@ -1,8 +1,6 @@
|
||||
#include <Common/Config/ConfigProcessor.h>
|
||||
#include <Interpreters/SecurityManager.h>
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
@ -10,8 +8,8 @@
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <unistd.h>
|
||||
#include <cstdlib>
|
||||
#include <port/unistd.h>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
@ -19,6 +19,7 @@ bool ParserTablePropertiesQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
|
||||
ParserKeyword s_desc("DESC");
|
||||
ParserKeyword s_show("SHOW");
|
||||
ParserKeyword s_create("CREATE");
|
||||
ParserKeyword s_database("DATABASE");
|
||||
ParserKeyword s_table("TABLE");
|
||||
ParserToken s_dot(TokenType::Dot);
|
||||
ParserIdentifier name_p;
|
||||
@ -27,6 +28,8 @@ bool ParserTablePropertiesQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
|
||||
ASTPtr table;
|
||||
std::shared_ptr<ASTQueryWithTableAndOutput> query;
|
||||
|
||||
bool parse_only_database_name = false;
|
||||
|
||||
if (s_exists.ignore(pos, expected))
|
||||
{
|
||||
query = std::make_shared<ASTExistsQuery>();
|
||||
@ -36,26 +39,40 @@ bool ParserTablePropertiesQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
|
||||
if (!s_create.ignore(pos, expected))
|
||||
return false;
|
||||
|
||||
query = std::make_shared<ASTShowCreateQuery>();
|
||||
if (s_database.ignore(pos, expected))
|
||||
{
|
||||
parse_only_database_name = true;
|
||||
query = std::make_shared<ASTShowCreateDatabaseQuery>();
|
||||
}
|
||||
else
|
||||
query = std::make_shared<ASTShowCreateTableQuery>();
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (s_temporary.ignore(pos, expected))
|
||||
query->temporary = true;
|
||||
|
||||
s_table.ignore(pos, expected);
|
||||
|
||||
if (!name_p.parse(pos, table, expected))
|
||||
return false;
|
||||
|
||||
if (s_dot.ignore(pos, expected))
|
||||
if (parse_only_database_name)
|
||||
{
|
||||
database = table;
|
||||
if (!name_p.parse(pos, database, expected))
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (s_temporary.ignore(pos, expected))
|
||||
query->temporary = true;
|
||||
|
||||
s_table.ignore(pos, expected);
|
||||
|
||||
if (!name_p.parse(pos, table, expected))
|
||||
return false;
|
||||
|
||||
if (s_dot.ignore(pos, expected))
|
||||
{
|
||||
database = table;
|
||||
if (!name_p.parse(pos, table, expected))
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (database)
|
||||
|
@ -12,12 +12,18 @@ struct ASTExistsQueryIDAndQueryNames
|
||||
static constexpr auto Query = "EXISTS TABLE";
|
||||
};
|
||||
|
||||
struct ASTShowCreateQueryIDAndQueryNames
|
||||
struct ASTShowCreateTableQueryIDAndQueryNames
|
||||
{
|
||||
static constexpr auto ID = "ShowCreateQuery";
|
||||
static constexpr auto ID = "ShowCreateTableQuery";
|
||||
static constexpr auto Query = "SHOW CREATE TABLE";
|
||||
};
|
||||
|
||||
struct ASTShowCreateDatabaseQueryIDAndQueryNames
|
||||
{
|
||||
static constexpr auto ID = "ShowCreateDatabaseQuery";
|
||||
static constexpr auto Query = "SHOW CREATE DATABASE";
|
||||
};
|
||||
|
||||
struct ASTDescribeQueryExistsQueryIDAndQueryNames
|
||||
{
|
||||
static constexpr auto ID = "DescribeQuery";
|
||||
@ -25,7 +31,17 @@ struct ASTDescribeQueryExistsQueryIDAndQueryNames
|
||||
};
|
||||
|
||||
using ASTExistsQuery = ASTQueryWithTableAndOutputImpl<ASTExistsQueryIDAndQueryNames>;
|
||||
using ASTShowCreateQuery = ASTQueryWithTableAndOutputImpl<ASTShowCreateQueryIDAndQueryNames>;
|
||||
using ASTShowCreateTableQuery = ASTQueryWithTableAndOutputImpl<ASTShowCreateTableQueryIDAndQueryNames>;
|
||||
|
||||
class ASTShowCreateDatabaseQuery : public ASTQueryWithTableAndOutputImpl<ASTShowCreateDatabaseQueryIDAndQueryNames>
|
||||
{
|
||||
protected:
|
||||
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << ASTShowCreateDatabaseQueryIDAndQueryNames::Query
|
||||
<< " " << (settings.hilite ? hilite_none : "") << backQuoteIfNeed(database);
|
||||
}
|
||||
};
|
||||
|
||||
class ASTDescribeQuery : public ASTQueryWithOutput
|
||||
{
|
||||
|
@ -1,31 +1,24 @@
|
||||
#include <unistd.h>
|
||||
#include <port/unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <fcntl.h>
|
||||
#include <signal.h>
|
||||
#include <time.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <iomanip>
|
||||
#include <random>
|
||||
#include <pcg_random.hpp>
|
||||
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/Util/Application.h>
|
||||
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <common/ThreadPool.h>
|
||||
#include <AggregateFunctions/ReservoirSampler.h>
|
||||
#include <AggregateFunctions/registerAggregateFunctions.h>
|
||||
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Core/Types.h>
|
||||
|
||||
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||
#include <IO/WriteBufferFromFileDescriptor.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
@ -33,13 +26,9 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
#include <DataStreams/RemoteBlockInputStream.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Client/Connection.h>
|
||||
|
||||
#include "InterruptListener.h"
|
||||
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
#include <unistd.h>
|
||||
#include <port/unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <fcntl.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <map>
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <iomanip>
|
||||
@ -10,13 +10,11 @@
|
||||
#include <algorithm>
|
||||
#include <optional>
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/Util/Application.h>
|
||||
|
||||
#include <common/readline_use.h>
|
||||
#include <common/find_first_symbols.h>
|
||||
|
||||
#include <Common/ClickHouseRevision.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/Exception.h>
|
||||
@ -35,6 +33,7 @@
|
||||
#include <IO/WriteBufferFromFileDescriptor.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <DataStreams/AsynchronousBlockInputStream.h>
|
||||
@ -102,7 +101,6 @@ private:
|
||||
"учшеж", "йгшеж", "дщпщгеж",
|
||||
"q", "й", "\\q", "\\Q", "\\й", "\\Й", ":q", "Жй"
|
||||
};
|
||||
|
||||
bool is_interactive = true; /// Use either readline interface or batch mode.
|
||||
bool need_render_progress = true; /// Render query execution progress.
|
||||
bool echo_queries = false; /// Print queries before execution in batch mode.
|
||||
@ -112,6 +110,7 @@ private:
|
||||
winsize terminal_size {}; /// Terminal size is needed to render progress bar.
|
||||
|
||||
std::unique_ptr<Connection> connection; /// Connection to DB.
|
||||
String query_id; /// Current query_id.
|
||||
String query; /// Current query.
|
||||
|
||||
String format; /// Query results output format.
|
||||
@ -139,6 +138,8 @@ private:
|
||||
|
||||
String current_profile;
|
||||
|
||||
String prompt_by_server_display_name;
|
||||
|
||||
/// Path to a file containing command history.
|
||||
String history_file;
|
||||
|
||||
@ -154,6 +155,7 @@ private:
|
||||
/// If the last query resulted in exception.
|
||||
bool got_exception = false;
|
||||
String server_version;
|
||||
String server_display_name;
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
@ -168,6 +170,49 @@ private:
|
||||
std::list<ExternalTable> external_tables;
|
||||
|
||||
|
||||
struct ConnectionParameters
|
||||
{
|
||||
String host;
|
||||
UInt16 port;
|
||||
String default_database;
|
||||
String user;
|
||||
String password;
|
||||
Protocol::Encryption security;
|
||||
Protocol::Compression compression;
|
||||
ConnectionTimeouts timeouts;
|
||||
|
||||
ConnectionParameters() {}
|
||||
|
||||
ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
bool is_secure = config.getBool("secure", false);
|
||||
security = is_secure
|
||||
? Protocol::Encryption::Enable
|
||||
: Protocol::Encryption::Disable;
|
||||
|
||||
host = config.getString("host", "localhost");
|
||||
port = config.getInt("port",
|
||||
config.getInt(is_secure ? "tcp_secure_port" : "tcp_port",
|
||||
is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
|
||||
|
||||
default_database = config.getString("database", "");
|
||||
user = config.getString("user", "");
|
||||
password = config.getString("password", "");
|
||||
|
||||
compression = config.getBool("compression", true)
|
||||
? Protocol::Compression::Enable
|
||||
: Protocol::Compression::Disable;
|
||||
|
||||
timeouts = ConnectionTimeouts(
|
||||
Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
|
||||
}
|
||||
};
|
||||
|
||||
ConnectionParameters connection_parameters;
|
||||
|
||||
|
||||
void initialize(Poco::Util::Application & self)
|
||||
{
|
||||
Poco::Util::Application::initialize(self);
|
||||
@ -309,6 +354,7 @@ private:
|
||||
echo_queries = config().getBool("echo", false);
|
||||
}
|
||||
|
||||
connection_parameters = ConnectionParameters(config());
|
||||
connect();
|
||||
|
||||
/// Initialize DateLUT here to avoid counting time spent here as query execution time.
|
||||
@ -338,8 +384,46 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
Strings keys;
|
||||
|
||||
prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) ");
|
||||
|
||||
config().keys("prompt_by_server_display_name", keys);
|
||||
|
||||
for (const String & key : keys)
|
||||
{
|
||||
if (key != "default" && server_display_name.find(key) != std::string::npos)
|
||||
{
|
||||
prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name." + key);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/// Prompt may contain escape sequences including \e[ or \x1b[ sequences to set terminal color.
|
||||
{
|
||||
String unescaped_prompt_by_server_display_name;
|
||||
ReadBufferFromString in(prompt_by_server_display_name);
|
||||
readEscapedString(unescaped_prompt_by_server_display_name, in);
|
||||
prompt_by_server_display_name = std::move(unescaped_prompt_by_server_display_name);
|
||||
}
|
||||
|
||||
/// Prompt may contain the following substitutions in a form of {name}.
|
||||
std::map<String, String> prompt_substitutions
|
||||
{
|
||||
{"host", connection_parameters.host},
|
||||
{"port", toString(connection_parameters.port)},
|
||||
{"user", connection_parameters.user},
|
||||
{"display_name", server_display_name},
|
||||
};
|
||||
|
||||
/// Quite suboptimal.
|
||||
for (const auto & [key, value]: prompt_substitutions)
|
||||
boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
|
||||
|
||||
if (is_interactive)
|
||||
{
|
||||
if (!query_id.empty())
|
||||
throw Exception("query_id could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
|
||||
if (print_time_to_stderr)
|
||||
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
@ -374,6 +458,7 @@ private:
|
||||
}
|
||||
else
|
||||
{
|
||||
query_id = config().getString("query_id", "");
|
||||
nonInteractive();
|
||||
|
||||
if (last_exception)
|
||||
@ -386,34 +471,23 @@ private:
|
||||
|
||||
void connect()
|
||||
{
|
||||
auto encryption = config().getBool("ssl", false)
|
||||
? Protocol::Encryption::Enable
|
||||
: Protocol::Encryption::Disable;
|
||||
|
||||
String host = config().getString("host", "localhost");
|
||||
UInt16 port = config().getInt("port", config().getInt(static_cast<bool>(encryption) ? "tcp_ssl_port" : "tcp_port", static_cast<bool>(encryption) ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
|
||||
String default_database = config().getString("database", "");
|
||||
String user = config().getString("user", "");
|
||||
String password = config().getString("password", "");
|
||||
|
||||
auto compression = config().getBool("compression", true)
|
||||
? Protocol::Compression::Enable
|
||||
: Protocol::Compression::Disable;
|
||||
|
||||
if (is_interactive)
|
||||
std::cout << "Connecting to "
|
||||
<< (!default_database.empty() ? "database " + default_database + " at " : "")
|
||||
<< host << ":" << port
|
||||
<< (!user.empty() ? " as user " + user : "")
|
||||
<< (!connection_parameters.default_database.empty() ? "database " + connection_parameters.default_database + " at " : "")
|
||||
<< connection_parameters.host << ":" << connection_parameters.port
|
||||
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "")
|
||||
<< "." << std::endl;
|
||||
|
||||
ConnectionTimeouts timeouts(
|
||||
Poco::Timespan(config().getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config().getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config().getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
|
||||
|
||||
connection = std::make_unique<Connection>(
|
||||
host, port, default_database, user, password, timeouts, "client", compression, encryption);
|
||||
connection_parameters.host,
|
||||
connection_parameters.port,
|
||||
connection_parameters.default_database,
|
||||
connection_parameters.user,
|
||||
connection_parameters.password,
|
||||
connection_parameters.timeouts,
|
||||
"client",
|
||||
connection_parameters.compression,
|
||||
connection_parameters.security);
|
||||
|
||||
String server_name;
|
||||
UInt64 server_version_major = 0;
|
||||
@ -429,6 +503,12 @@ private:
|
||||
connection->getServerVersion(server_name, server_version_major, server_version_minor, server_revision);
|
||||
|
||||
server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_revision);
|
||||
|
||||
if (server_display_name = connection->getServerDisplayName(); server_display_name.length() == 0)
|
||||
{
|
||||
server_display_name = config().getString("host", "localhost");
|
||||
}
|
||||
|
||||
if (is_interactive)
|
||||
{
|
||||
std::cout << "Connected to " << server_name
|
||||
@ -449,12 +529,17 @@ private:
|
||||
return select(1, &fds, 0, 0, &timeout) == 1;
|
||||
}
|
||||
|
||||
inline const String prompt() const
|
||||
{
|
||||
return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
|
||||
}
|
||||
|
||||
void loop()
|
||||
{
|
||||
String query;
|
||||
String prev_query;
|
||||
while (char * line_ = readline(query.empty() ? ":) " : ":-] "))
|
||||
|
||||
while (char * line_ = readline(query.empty() ? prompt().c_str() : ":-] "))
|
||||
{
|
||||
String line = line_;
|
||||
free(line_);
|
||||
@ -735,7 +820,7 @@ private:
|
||||
/// Process the query that doesn't require transfering data blocks to the server.
|
||||
void processOrdinaryQuery()
|
||||
{
|
||||
connection->sendQuery(query, "", QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
|
||||
connection->sendQuery(query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
|
||||
sendExternalTables();
|
||||
receiveResult();
|
||||
}
|
||||
@ -753,7 +838,7 @@ private:
|
||||
if (!parsed_insert_query.data && (is_interactive || (stdin_is_not_tty && std_in.eof())))
|
||||
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
|
||||
|
||||
connection->sendQuery(query_without_data, "", QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
|
||||
connection->sendQuery(query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
|
||||
sendExternalTables();
|
||||
|
||||
/// Receive description of table structure.
|
||||
@ -1289,6 +1374,7 @@ public:
|
||||
("ssl,s", "ssl")
|
||||
("user,u", boost::program_options::value<std::string>(), "user")
|
||||
("password", boost::program_options::value<std::string>(), "password")
|
||||
("query_id", boost::program_options::value<std::string>(), "query_id")
|
||||
("query,q", boost::program_options::value<std::string>(), "query")
|
||||
("database,d", boost::program_options::value<std::string>(), "database")
|
||||
("pager", boost::program_options::value<std::string>(), "pager")
|
||||
@ -1377,6 +1463,8 @@ public:
|
||||
config().setString("config-file", options["config-file"].as<std::string>());
|
||||
if (options.count("host") && !options["host"].defaulted())
|
||||
config().setString("host", options["host"].as<std::string>());
|
||||
if (options.count("query_id"))
|
||||
config().setString("query_id", options["query_id"].as<std::string>());
|
||||
if (options.count("query"))
|
||||
config().setString("query", options["query"].as<std::string>());
|
||||
if (options.count("database"))
|
||||
|
@ -1128,7 +1128,7 @@ protected:
|
||||
catch (const zkutil::KeeperException & e)
|
||||
{
|
||||
LOG_INFO(log, "A ZooKeeper error occurred while checking partition " << partition_name
|
||||
<< ". Will recheck the partition. Error: " << e.what());
|
||||
<< ". Will recheck the partition. Error: " << e.displayText());
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -1259,8 +1259,10 @@ protected:
|
||||
}
|
||||
|
||||
/// Remove the locking node
|
||||
cleaner_holder.reset();
|
||||
zookeeper->remove(is_dirty_flag_path);
|
||||
zkutil::Ops ops;
|
||||
ops.emplace_back(new zkutil::Op::Remove(dirt_cleaner_path, -1));
|
||||
ops.emplace_back(new zkutil::Op::Remove(is_dirty_flag_path, -1));
|
||||
zookeeper->multi(ops);
|
||||
|
||||
LOG_INFO(log, "Partition " << task_partition.name << " was dropped on cluster " << task_table.cluster_push_name);
|
||||
return true;
|
||||
@ -1283,6 +1285,7 @@ protected:
|
||||
Stopwatch watch;
|
||||
TasksShard expected_shards;
|
||||
size_t num_failed_shards = 0;
|
||||
bool previous_shard_is_instantly_finished = false;
|
||||
|
||||
++cluster_partition.total_tries;
|
||||
|
||||
@ -1328,15 +1331,20 @@ protected:
|
||||
|
||||
expected_shards.emplace_back(shard);
|
||||
|
||||
/// Do not sleep if there is a sequence of already processed shards to increase startup
|
||||
bool sleep_before_execution = !previous_shard_is_instantly_finished && shard->priority.is_remote;
|
||||
PartitionTaskStatus task_status = PartitionTaskStatus::Error;
|
||||
bool was_error = false;
|
||||
for (size_t try_num = 0; try_num < max_shard_partition_tries; ++try_num)
|
||||
{
|
||||
task_status = tryProcessPartitionTask(partition);
|
||||
task_status = tryProcessPartitionTask(partition, sleep_before_execution);
|
||||
|
||||
/// Exit if success
|
||||
if (task_status == PartitionTaskStatus::Finished)
|
||||
break;
|
||||
|
||||
was_error = true;
|
||||
|
||||
/// Skip if the task is being processed by someone
|
||||
if (task_status == PartitionTaskStatus::Active)
|
||||
break;
|
||||
@ -1347,6 +1355,8 @@ protected:
|
||||
|
||||
if (task_status == PartitionTaskStatus::Error)
|
||||
++num_failed_shards;
|
||||
|
||||
previous_shard_is_instantly_finished = !was_error;
|
||||
}
|
||||
|
||||
cluster_partition.elapsed_time_seconds += watch.elapsedSeconds();
|
||||
@ -1413,13 +1423,13 @@ protected:
|
||||
Error,
|
||||
};
|
||||
|
||||
PartitionTaskStatus tryProcessPartitionTask(ShardPartition & task_partition)
|
||||
PartitionTaskStatus tryProcessPartitionTask(ShardPartition & task_partition, bool sleep_before_execution)
|
||||
{
|
||||
PartitionTaskStatus res;
|
||||
|
||||
try
|
||||
{
|
||||
res = processPartitionTaskImpl(task_partition);
|
||||
res = processPartitionTaskImpl(task_partition, sleep_before_execution);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -1440,7 +1450,7 @@ protected:
|
||||
return res;
|
||||
}
|
||||
|
||||
PartitionTaskStatus processPartitionTaskImpl(ShardPartition & task_partition)
|
||||
PartitionTaskStatus processPartitionTaskImpl(ShardPartition & task_partition, bool sleep_before_execution)
|
||||
{
|
||||
TaskShard & task_shard = task_partition.task_shard;
|
||||
TaskTable & task_table = task_shard.task_table;
|
||||
@ -1480,7 +1490,7 @@ protected:
|
||||
};
|
||||
|
||||
/// Load balancing
|
||||
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path, task_shard.priority.is_remote);
|
||||
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path, sleep_before_execution);
|
||||
|
||||
LOG_DEBUG(log, "Processing " << current_task_status_path);
|
||||
|
||||
@ -1654,7 +1664,7 @@ protected:
|
||||
}
|
||||
|
||||
using ExistsFuture = zkutil::ZooKeeper::ExistsFuture;
|
||||
auto future_is_dirty_checker = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
|
||||
std::unique_ptr<ExistsFuture> future_is_dirty_checker;
|
||||
|
||||
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
|
||||
constexpr size_t check_period_milliseconds = 500;
|
||||
@ -1665,9 +1675,15 @@ protected:
|
||||
if (zookeeper->expired())
|
||||
throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
|
||||
|
||||
if (future_is_dirty_checker != nullptr)
|
||||
if (!future_is_dirty_checker)
|
||||
future_is_dirty_checker = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
|
||||
|
||||
/// check_period_milliseconds should less than average insert time of single block
|
||||
/// Otherwise, the insertion will slow a little bit
|
||||
if (watch.elapsedMilliseconds() >= check_period_milliseconds)
|
||||
{
|
||||
zkutil::ZooKeeper::StatAndExists status;
|
||||
|
||||
try
|
||||
{
|
||||
status = future_is_dirty_checker->get();
|
||||
@ -1687,12 +1703,6 @@ protected:
|
||||
throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
|
||||
}
|
||||
|
||||
if (watch.elapsedMilliseconds() >= check_period_milliseconds)
|
||||
{
|
||||
watch.restart();
|
||||
future_is_dirty_checker = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
#include <Common/ExternalTable.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
|
||||
#include <Common/getFQDNOrHostName.h>
|
||||
#include <IO/ReadBufferFromIStream.h>
|
||||
#include <IO/ZlibInflatingReadBuffer.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
@ -198,6 +198,7 @@ HTTPHandler::HTTPHandler(IServer & server_)
|
||||
: server(server_)
|
||||
, log(&Logger::get("HTTPHandler"))
|
||||
{
|
||||
server_display_name = server.config().getString("display_name", getFQDNOrHostName());
|
||||
}
|
||||
|
||||
|
||||
@ -629,7 +630,7 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
|
||||
try
|
||||
{
|
||||
response.setContentType("text/plain; charset=UTF-8");
|
||||
|
||||
response.set("X-ClickHouse-Server-Display-Name", server_display_name);
|
||||
/// For keep-alive to work.
|
||||
if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
|
||||
response.setChunkedTransferEncoding(true);
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user