Merge remote-tracking branch 'upstream/master' into nikvas0/bloom_filter_index

This commit is contained in:
Nikita Vasilev 2019-03-11 13:06:09 +03:00
commit d171330599
131 changed files with 1978 additions and 901 deletions

View File

@ -50,6 +50,7 @@ string(TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_UC)
message (STATUS "CMAKE_BUILD_TYPE: ${CMAKE_BUILD_TYPE}")
set (CMAKE_CONFIGURATION_TYPES "RelWithDebInfo;Debug;Release;MinSizeRel" CACHE STRING "" FORCE)
set (CMAKE_DEBUG_POSTFIX "d" CACHE STRING "Generate debug library name with a postfix.") # To be consistent with CMakeLists from contrib libs.
option (USE_STATIC_LIBRARIES "Set to FALSE to use shared libraries" ON)
option (MAKE_STATIC_LIBRARIES "Set to FALSE to make shared libraries" ${USE_STATIC_LIBRARIES})
@ -98,10 +99,6 @@ if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
if (OS_LINUX AND NOT UNBUNDLED AND MAKE_STATIC_LIBRARIES AND CMAKE_VERSION VERSION_GREATER "3.9.0")
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Only for x86_64, Linux. Implies USE_INTERNAL_MEMCPY." ON)
if (GLIBC_COMPATIBILITY)
message (STATUS "Some symbols from glibc will be replaced for compatibility")
link_libraries(glibc-compatibility)
endif ()
endif ()
endif ()
@ -177,6 +174,60 @@ set (CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0 -g3 -ggdb3
include (cmake/use_libcxx.cmake)
# Set standard, system and compiler libraries explicitly.
# This is intended for more control of what we are linking.
set (DEFAULT_LIBS "")
if (OS_LINUX AND NOT UNBUNDLED)
# Note: this probably has no effict, but I'm not an expert in CMake.
set (CMAKE_C_IMPLICIT_LINK_LIBRARIES "")
set (CMAKE_CXX_IMPLICIT_LINK_LIBRARIES "")
# Disable default linked libraries.
set (DEFAULT_LIBS "-nodefaultlibs")
# Add C++ libraries.
#
# This consist of:
# - C++ standard library (like implementation of std::string);
# - C++ ABI implementation (functions for exceptions like __cxa_throw, RTTI, etc);
# - functions for internal implementation of exception handling (stack unwinding based on DWARF info; TODO replace with bundled libunwind);
# - compiler builtins (example: functions for implementation of __int128 operations);
#
# There are two variants of C++ library: libc++ (from LLVM compiler infrastructure) and libstdc++ (from GCC).
if (USE_LIBCXX)
set (BUILTINS_LIB_PATH "")
if (COMPILER_CLANG)
execute_process (COMMAND ${CMAKE_CXX_COMPILER} --print-file-name=libclang_rt.builtins-${CMAKE_SYSTEM_PROCESSOR}.a OUTPUT_VARIABLE BUILTINS_LIB_PATH OUTPUT_STRIP_TRAILING_WHITESPACE)
endif ()
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic -lc++ -lc++abi -lgcc_eh ${BUILTINS_LIB_PATH} -Wl,-Bdynamic")
else ()
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic -lstdc++ -lgcc_eh -lgcc -Wl,-Bdynamic")
endif ()
# Linking with GLIBC prevents portability of binaries to older systems.
# We overcome this behaviour by statically linking with our own implementation of all new symbols (that don't exist in older Libc or have infamous "symbol versioning").
# The order of linking is important: 'glibc-compatibility' must be before libc but after all other libraries.
if (GLIBC_COMPATIBILITY)
message (STATUS "Some symbols from glibc will be replaced for compatibility")
string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_UC)
set (CMAKE_POSTFIX_VARIABLE "CMAKE_${CMAKE_BUILD_TYPE_UC}_POSTFIX")
set (DEFAULT_LIBS "${DEFAULT_LIBS} libs/libglibc-compatibility/libglibc-compatibility${${CMAKE_POSTFIX_VARIABLE}}.a")
endif ()
# Add Libc. GLIBC is actually a collection of interdependent libraries.
set (DEFAULT_LIBS "${DEFAULT_LIBS} -lrt -ldl -lpthread -lm -lc")
# Note: we'd rather use Musl libc library, but it's little bit more difficult to use.
message(STATUS "Default libraries: ${DEFAULT_LIBS}")
endif ()
if (NOT MAKE_STATIC_LIBRARIES)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
endif ()
@ -284,3 +335,35 @@ add_subdirectory (utils)
add_subdirectory (dbms)
include (cmake/print_include_directories.cmake)
if (DEFAULT_LIBS)
# Add default libs to all targets as the last dependency.
# I have found no better way to specify default libs in CMake that will appear single time in specific order at the end of linker arguments.
function(add_default_libs target_name)
if (TARGET ${target_name})
# message(STATUS "Has target ${target_name}")
set_property(TARGET ${target_name} APPEND PROPERTY LINK_LIBRARIES "${DEFAULT_LIBS}")
set_property(TARGET ${target_name} APPEND PROPERTY INTERFACE_LINK_LIBRARIES "${DEFAULT_LIBS}")
if (GLIBC_COMPATIBILITY)
add_dependencies(${target_name} glibc-compatibility)
endif ()
endif ()
endfunction ()
add_default_libs(ltdl)
add_default_libs(zlibstatic)
add_default_libs(jemalloc)
add_default_libs(unwind)
add_default_libs(memcpy)
add_default_libs(Foundation)
add_default_libs(common)
add_default_libs(gtest)
add_default_libs(lz4)
add_default_libs(zstd)
add_default_libs(snappy)
add_default_libs(arrow)
add_default_libs(thrift_static)
add_default_libs(boost_regex_internal)
endif ()

View File

@ -11,38 +11,13 @@ if (OS_LINUX AND COMPILER_CLANG)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}")
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++ (only make sense on Linux with Clang)" ${HAVE_LIBCXX})
set (LIBCXX_PATH "" CACHE STRING "Use custom path for libc++. It should be used for MSan.")
if (USE_LIBCXX)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++") # Ok for clang6, for older can cause 'not used option' warning
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_DEBUG=0") # More checks in debug build.
if (MAKE_STATIC_LIBRARIES)
execute_process (COMMAND ${CMAKE_CXX_COMPILER} --print-file-name=libclang_rt.builtins-${CMAKE_SYSTEM_PROCESSOR}.a OUTPUT_VARIABLE BUILTINS_LIB_PATH OUTPUT_STRIP_TRAILING_WHITESPACE)
link_libraries (-nodefaultlibs -Wl,-Bstatic -stdlib=libc++ c++ c++abi gcc_eh ${BUILTINS_LIB_PATH} rt -Wl,-Bdynamic dl pthread m c)
else ()
link_libraries (-stdlib=libc++ c++ c++abi)
endif ()
if (LIBCXX_PATH)
include_directories (SYSTEM BEFORE "${LIBCXX_PATH}/include" "${LIBCXX_PATH}/include/c++/v1")
link_directories ("${LIBCXX_PATH}/lib")
endif ()
endif ()
endif ()
if (USE_LIBCXX)
set (STATIC_STDLIB_FLAGS "")
else ()
set (STATIC_STDLIB_FLAGS "-static-libgcc -static-libstdc++")
endif ()
if (MAKE_STATIC_LIBRARIES AND NOT APPLE AND NOT (COMPILER_CLANG AND OS_FREEBSD))
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${STATIC_STDLIB_FLAGS}")
# Along with executables, we also build example of shared library for "library dictionary source"; and it also should be self-contained.
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${STATIC_STDLIB_FLAGS}")
endif ()
if (USE_STATIC_LIBRARIES AND HAVE_NO_PIE)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${FLAG_NO_PIE}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FLAG_NO_PIE}")

2
contrib/cppkafka vendored

@ -1 +1 @@
Subproject commit 860c90e92eee6690aa74a2ca7b7c5c6930dffecd
Subproject commit 9b184d881c15cc50784b28688c7c99d3d764db24

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit 363dcad5a23dc29381cc626620e68ae418b3af19
Subproject commit 51ae5f5fd8b742e56f47a8bb0136344868818285

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54415)
set(VERSION_REVISION 54417)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 3)
set(VERSION_PATCH 4)
set(VERSION_GITHASH 263e69e861b769eae7e2bcc79d87673e3a08d376)
set(VERSION_DESCRIBE v19.3.4-testing)
set(VERSION_STRING 19.3.4)
set(VERSION_MINOR 5)
set(VERSION_PATCH 1)
set(VERSION_GITHASH 628ed349c335b79a441a1bd6e4bc791d61dfe62c)
set(VERSION_DESCRIBE v19.5.1.1-testing)
set(VERSION_STRING 19.5.1.1)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -10,7 +10,7 @@ set(CLICKHOUSE_SERVER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/TCPHandler.cpp
)
set(CLICKHOUSE_SERVER_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io daemon clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${Poco_Net_LIBRARY})
set(CLICKHOUSE_SERVER_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io PUBLIC daemon PRIVATE clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${Poco_Net_LIBRARY})
if (USE_POCO_NETSSL)
set(CLICKHOUSE_SERVER_LINK ${CLICKHOUSE_SERVER_LINK} PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY})
endif ()

View File

@ -163,7 +163,7 @@ public:
size_t old_size = data_to.size();
data_to.resize(data_to.size() + size);
data.getManyFloat(levels.levels.data(), levels.permutation.data(), size, &data_to[old_size]);
data.getManyFloat(levels.levels.data(), levels.permutation.data(), size, data_to.data() + old_size);
}
else
{
@ -171,7 +171,7 @@ public:
size_t old_size = data_to.size();
data_to.resize(data_to.size() + size);
data.getMany(levels.levels.data(), levels.permutation.data(), size, &data_to[old_size]);
data.getMany(levels.levels.data(), levels.permutation.data(), size, data_to.data() + old_size);
}
}
else

View File

@ -39,19 +39,19 @@ class AggregateFunctionTopKDateTime : public AggregateFunctionTopK<DataTypeDateT
template <bool is_weighted>
static IAggregateFunction * createWithExtraTypes(const DataTypePtr & argument_type, UInt64 threshold, const Array & params)
static IAggregateFunction * createWithExtraTypes(const DataTypePtr & argument_type, UInt64 threshold, UInt64 load_factor, const Array & params)
{
WhichDataType which(argument_type);
if (which.idx == TypeIndex::Date)
return new AggregateFunctionTopKDate<is_weighted>(threshold, {argument_type}, params);
return new AggregateFunctionTopKDate<is_weighted>(threshold, load_factor, {argument_type}, params);
if (which.idx == TypeIndex::DateTime)
return new AggregateFunctionTopKDateTime<is_weighted>(threshold, {argument_type}, params);
return new AggregateFunctionTopKDateTime<is_weighted>(threshold, load_factor, {argument_type}, params);
/// Check that we can use plain version of AggregateFunctionTopKGeneric
if (argument_type->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
return new AggregateFunctionTopKGeneric<true, is_weighted>(threshold, argument_type, params);
return new AggregateFunctionTopKGeneric<true, is_weighted>(threshold, load_factor, argument_type, params);
else
return new AggregateFunctionTopKGeneric<false, is_weighted>(threshold, argument_type, params);
return new AggregateFunctionTopKGeneric<false, is_weighted>(threshold, load_factor, argument_type, params);
}
@ -65,19 +65,28 @@ AggregateFunctionPtr createAggregateFunctionTopK(const std::string & name, const
else
{
assertBinary(name, argument_types);
if (!isNumber(argument_types[1]))
throw Exception("The second argument for aggregate function 'topKWeighted' must have numeric type", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isInteger(argument_types[1]))
throw Exception("The second argument for aggregate function 'topKWeighted' must have integer type", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
UInt64 threshold = 10; /// default value
UInt64 threshold = 10; /// default values
UInt64 load_factor = 3;
if (!params.empty())
{
if (params.size() != 1)
throw Exception("Aggregate function " + name + " requires one parameter or less.",
if (params.size() > 2)
throw Exception("Aggregate function " + name + " requires two parameters or less.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
UInt64 k = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), params[0]);
if (params.size() == 2)
{
load_factor = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), params[1]);
if (load_factor < 1)
throw Exception("Too small parameter for aggregate function " + name + ". Minimum: 1",
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (k > TOP_K_MAX_SIZE)
throw Exception("Too large parameter for aggregate function " + name + ". Maximum: " + toString(TOP_K_MAX_SIZE),
@ -90,10 +99,10 @@ AggregateFunctionPtr createAggregateFunctionTopK(const std::string & name, const
threshold = k;
}
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionTopK, is_weighted>(*argument_types[0], threshold, argument_types, params));
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionTopK, is_weighted>(*argument_types[0], threshold, load_factor, argument_types, params));
if (!res)
res = AggregateFunctionPtr(createWithExtraTypes<is_weighted>(argument_types[0], threshold, params));
res = AggregateFunctionPtr(createWithExtraTypes<is_weighted>(argument_types[0], threshold, load_factor, params));
if (!res)
throw Exception("Illegal type " + argument_types[0]->getName() +

View File

@ -20,10 +20,6 @@ namespace DB
{
// Allow NxK more space before calculating top K to increase accuracy
#define TOP_K_LOAD_FACTOR 3
template <typename T>
struct AggregateFunctionTopKData
{
@ -48,9 +44,9 @@ protected:
UInt64 reserved;
public:
AggregateFunctionTopK(UInt64 threshold, const DataTypes & argument_types_, const Array & params)
AggregateFunctionTopK(UInt64 threshold, UInt64 load_factor, const DataTypes & argument_types_, const Array & params)
: IAggregateFunctionDataHelper<AggregateFunctionTopKData<T>, AggregateFunctionTopK<T, is_weighted>>(argument_types_, params)
, threshold(threshold), reserved(TOP_K_LOAD_FACTOR * threshold) {}
, threshold(threshold), reserved(load_factor * threshold) {}
String getName() const override { return is_weighted ? "topKWeighted" : "topK"; }
@ -143,9 +139,9 @@ private:
public:
AggregateFunctionTopKGeneric(
UInt64 threshold, const DataTypePtr & input_data_type, const Array & params)
UInt64 threshold, UInt64 load_factor, const DataTypePtr & input_data_type, const Array & params)
: IAggregateFunctionDataHelper<AggregateFunctionTopKGenericData, AggregateFunctionTopKGeneric<is_plain_column, is_weighted>>({input_data_type}, params)
, threshold(threshold), reserved(TOP_K_LOAD_FACTOR * threshold), input_data_type(this->argument_types[0]) {}
, threshold(threshold), reserved(load_factor * threshold), input_data_type(this->argument_types[0]) {}
String getName() const override { return is_weighted ? "topKWeighted" : "topK"; }
@ -238,6 +234,4 @@ public:
const char * getHeaderFilePath() const override { return __FILE__; }
};
#undef TOP_K_LOAD_FACTOR
}

View File

@ -85,7 +85,7 @@ class QuantileTDigest
Params params;
/// The memory will be allocated to several elements at once, so that the state occupies 64 bytes.
static constexpr size_t bytes_in_arena = 64 - sizeof(PODArray<Centroid>) - sizeof(Count) - sizeof(UInt32);
static constexpr size_t bytes_in_arena = 128 - sizeof(PODArray<Centroid>) - sizeof(Count) - sizeof(UInt32);
using Summary = PODArray<Centroid, bytes_in_arena / sizeof(Centroid), AllocatorWithStackMemory<Allocator<false>, bytes_in_arena>>;

View File

@ -152,7 +152,7 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start
size_t old_size = data.size();
data.resize(old_size + length);
memcpy(&data[old_size], &from_concrete.data[start], length * sizeof(data[0]));
memcpy(data.data() + old_size, &from_concrete.data[start], length * sizeof(data[0]));
}
}
@ -255,6 +255,11 @@ size_t ColumnAggregateFunction::allocatedBytes() const
return res;
}
void ColumnAggregateFunction::protect()
{
data.protect();
}
MutableColumnPtr ColumnAggregateFunction::cloneEmpty() const
{
return create(func, Arenas(1, std::make_shared<Arena>()));

View File

@ -157,6 +157,8 @@ public:
size_t allocatedBytes() const override;
void protect() override;
void insertRangeFrom(const IColumn & from, size_t start, size_t length) override;
void popBack(size_t n) override;

View File

@ -311,6 +311,13 @@ size_t ColumnArray::allocatedBytes() const
}
void ColumnArray::protect()
{
getData().protect();
getOffsets().protect();
}
bool ColumnArray::hasEqualOffsets(const ColumnArray & other) const
{
if (offsets == other.offsets)

View File

@ -78,6 +78,7 @@ public:
void reserve(size_t n) override;
size_t byteSize() const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
ColumnPtr convertToFullColumnIfConst() const override;
void getExtremes(Field & min, Field & max) const override;

View File

@ -140,7 +140,7 @@ void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
size_t old_size = data.size();
data.resize(old_size + length);
memcpy(&data[old_size], &src_vec.data[start], length * sizeof(data[0]));
memcpy(data.data() + old_size, &src_vec.data[start], length * sizeof(data[0]));
}
template <typename T>

View File

@ -87,6 +87,7 @@ public:
size_t size() const override { return data.size(); }
size_t byteSize() const override { return data.size() * sizeof(data[0]); }
size_t allocatedBytes() const override { return data.allocated_bytes(); }
void protect() override { data.protect(); }
void reserve(size_t n) override { data.reserve(n); }
void insertFrom(const IColumn & src, size_t n) override { data.push_back(static_cast<const Self &>(src).getData()[n]); }

View File

@ -55,7 +55,7 @@ void ColumnFixedString::insert(const Field & x)
size_t old_size = chars.size();
chars.resize_fill(old_size + n);
memcpy(&chars[old_size], s.data(), s.size());
memcpy(chars.data() + old_size, s.data(), s.size());
}
void ColumnFixedString::insertFrom(const IColumn & src_, size_t index)
@ -67,7 +67,7 @@ void ColumnFixedString::insertFrom(const IColumn & src_, size_t index)
size_t old_size = chars.size();
chars.resize(old_size + n);
memcpySmallAllowReadWriteOverflow15(&chars[old_size], &src.chars[n * index], n);
memcpySmallAllowReadWriteOverflow15(chars.data() + old_size, &src.chars[n * index], n);
}
void ColumnFixedString::insertData(const char * pos, size_t length)
@ -77,7 +77,7 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
size_t old_size = chars.size();
chars.resize_fill(old_size + n);
memcpy(&chars[old_size], pos, length);
memcpy(chars.data() + old_size, pos, length);
}
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const
@ -91,7 +91,7 @@ const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos)
{
size_t old_size = chars.size();
chars.resize(old_size + n);
memcpy(&chars[old_size], pos, n);
memcpy(chars.data() + old_size, pos, n);
return pos + n;
}
@ -151,7 +151,7 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_
size_t old_size = chars.size();
chars.resize(old_size + length * n);
memcpy(&chars[old_size], &src_concrete.chars[start * n], length * n);
memcpy(chars.data() + old_size, &src_concrete.chars[start * n], length * n);
}
ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const

View File

@ -57,6 +57,11 @@ public:
return chars.allocated_bytes() + sizeof(n);
}
void protect() override
{
chars.protect();
}
Field operator[](size_t index) const override
{
return String(reinterpret_cast<const char *>(&chars[n * index]), n);

View File

@ -363,7 +363,6 @@ ColumnPtr ColumnLowCardinality::countKeys() const
}
ColumnLowCardinality::Index::Index() : positions(ColumnUInt8::create()), size_of_type(sizeof(UInt8)) {}
ColumnLowCardinality::Index::Index(MutableColumnPtr && positions) : positions(std::move(positions))

View File

@ -291,6 +291,12 @@ size_t ColumnNullable::allocatedBytes() const
return getNestedColumn().allocatedBytes() + getNullMapColumn().allocatedBytes();
}
void ColumnNullable::protect()
{
getNestedColumn().protect();
getNullMapColumn().protect();
}
namespace
{

View File

@ -71,6 +71,7 @@ public:
void reserve(size_t n) override;
size_t byteSize() const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void getExtremes(Field & min, Field & max) const override;

View File

@ -185,7 +185,7 @@ const char * ColumnString::deserializeAndInsertFromArena(const char * pos)
const size_t old_size = chars.size();
const size_t new_size = old_size + string_size;
chars.resize(new_size);
memcpy(&chars[old_size], pos, string_size);
memcpy(chars.data() + old_size, pos, string_size);
offsets.push_back(new_size);
return pos + string_size;
@ -412,4 +412,11 @@ void ColumnString::getPermutationWithCollation(const Collator & collator, bool r
}
}
void ColumnString::protect()
{
getChars().protect();
getOffsets().protect();
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <string.h>
#include <cstring>
#include <cassert>
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
@ -67,25 +68,31 @@ public:
return chars.allocated_bytes() + offsets.allocated_bytes();
}
void protect() override;
MutableColumnPtr cloneResized(size_t to_size) const override;
Field operator[](size_t n) const override
{
assert(n < size());
return Field(&chars[offsetAt(n)], sizeAt(n) - 1);
}
void get(size_t n, Field & res) const override
{
assert(n < size());
res.assignString(&chars[offsetAt(n)], sizeAt(n) - 1);
}
StringRef getDataAt(size_t n) const override
{
assert(n < size());
return StringRef(&chars[offsetAt(n)], sizeAt(n) - 1);
}
StringRef getDataAtWithTerminatingZero(size_t n) const override
{
assert(n < size());
return StringRef(&chars[offsetAt(n)], sizeAt(n));
}
@ -103,7 +110,7 @@ public:
const size_t new_size = old_size + size_to_append;
chars.resize(new_size);
memcpy(&chars[old_size], s.c_str(), size_to_append);
memcpy(chars.data() + old_size, s.c_str(), size_to_append);
offsets.push_back(new_size);
}
@ -114,36 +121,22 @@ public:
void insertFrom(const IColumn & src_, size_t n) override
{
const ColumnString & src = static_cast<const ColumnString &>(src_);
const size_t size_to_append = src.offsets[n] - src.offsets[n - 1]; /// -1th index is Ok, see PaddedPODArray.
if (n != 0)
if (size_to_append == 1)
{
const size_t size_to_append = src.offsets[n] - src.offsets[n - 1];
if (size_to_append == 1)
{
/// shortcut for empty string
chars.push_back(0);
offsets.push_back(chars.size());
}
else
{
const size_t old_size = chars.size();
const size_t offset = src.offsets[n - 1];
const size_t new_size = old_size + size_to_append;
chars.resize(new_size);
memcpySmallAllowReadWriteOverflow15(&chars[old_size], &src.chars[offset], size_to_append);
offsets.push_back(new_size);
}
/// shortcut for empty string
chars.push_back(0);
offsets.push_back(chars.size());
}
else
{
const size_t old_size = chars.size();
const size_t size_to_append = src.offsets[0];
const size_t offset = src.offsets[n - 1];
const size_t new_size = old_size + size_to_append;
chars.resize(new_size);
memcpySmallAllowReadWriteOverflow15(&chars[old_size], &src.chars[0], size_to_append);
memcpySmallAllowReadWriteOverflow15(chars.data() + old_size, &src.chars[offset], size_to_append);
offsets.push_back(new_size);
}
}
@ -155,7 +148,7 @@ public:
chars.resize(new_size);
if (length)
memcpy(&chars[old_size], pos, length);
memcpy(chars.data() + old_size, pos, length);
chars[old_size + length] = 0;
offsets.push_back(new_size);
}
@ -167,7 +160,7 @@ public:
const size_t new_size = old_size + length;
chars.resize(new_size);
memcpy(&chars[old_size], pos, length);
memcpy(chars.data() + old_size, pos, length);
offsets.push_back(new_size);
}

View File

@ -315,6 +315,12 @@ size_t ColumnTuple::allocatedBytes() const
return res;
}
void ColumnTuple::protect()
{
for (auto & column : columns)
column->assumeMutableRef().protect();
}
void ColumnTuple::getExtremes(Field & min, Field & max) const
{
const size_t tuple_size = columns.size();

View File

@ -71,6 +71,7 @@ public:
void reserve(size_t n) override;
size_t byteSize() const override;
size_t allocatedBytes() const override;
void protect() override;
void forEachSubcolumn(ColumnCallback callback) override;
size_t tupleSize() const { return columns.size(); }

View File

@ -80,6 +80,7 @@ public:
bool isNumeric() const override { return column_holder->isNumeric(); }
size_t byteSize() const override { return column_holder->byteSize(); }
void protect() override { column_holder->assumeMutableRef().protect(); }
size_t allocatedBytes() const override
{
return column_holder->allocatedBytes()

View File

@ -141,7 +141,7 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
size_t old_size = data.size();
data.resize(old_size + length);
memcpy(&data[old_size], &src_vec.data[start], length * sizeof(data[0]));
memcpy(data.data() + old_size, &src_vec.data[start], length * sizeof(data[0]));
}
template <typename T>

View File

@ -163,6 +163,11 @@ public:
return data.allocated_bytes();
}
void protect() override
{
data.protect();
}
void insertValue(const T value)
{
data.push_back(value);

View File

@ -24,9 +24,10 @@ namespace DB
class ColumnVectorHelper : public IColumn
{
public:
template <size_t ELEMENT_SIZE>
const char * getRawDataBegin() const
{
return *reinterpret_cast<const char * const *>(reinterpret_cast<const char *>(this) + sizeof(*this));
return reinterpret_cast<const PODArrayBase<ELEMENT_SIZE, 4096, Allocator<false>, 15, 16> *>(reinterpret_cast<const char *>(this) + sizeof(*this))->raw_data();
}
template <size_t ELEMENT_SIZE>

View File

@ -253,6 +253,10 @@ public:
/// Zero, if could be determined.
virtual size_t allocatedBytes() const = 0;
/// Make memory region readonly with mprotect if it is large enough.
/// The operation is slow and performed only for debug builds.
virtual void protect() {}
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
/// Shallow: doesn't do recursive calls; don't do call for itself.
using ColumnCallback = std::function<void(Ptr&)>;

View File

@ -43,11 +43,30 @@ namespace ErrorCodes
*
* PS. This is also required, because tcmalloc can not allocate a chunk of memory greater than 16 GB.
*/
static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
#ifdef NDEBUG
static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
#else
/// In debug build, use small mmap threshold to reproduce more memory stomping bugs.
/// Along with ASLR it will hopefully detect more issues than ASan.
/// The program may fail due to the limit on number of memory mappings.
static constexpr size_t MMAP_THRESHOLD = 4096;
#endif
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;
static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
template <bool clear_memory_>
void * Allocator<clear_memory_>::mmap_hint()
{
#if ALLOCATOR_ASLR
return reinterpret_cast<void *>(std::uniform_int_distribution<intptr_t>(0x100000000000UL, 0x700000000000UL)(rng));
#else
return nullptr;
#endif
}
template <bool clear_memory_>
void * Allocator<clear_memory_>::alloc(size_t size, size_t alignment)
{
@ -61,7 +80,7 @@ void * Allocator<clear_memory_>::alloc(size_t size, size_t alignment)
throw DB::Exception("Too large alignment " + formatReadableSizeWithBinarySuffix(alignment) + ": more than page size when allocating "
+ formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::BAD_ARGUMENTS);
buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
buf = mmap(mmap_hint(), size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);

View File

@ -2,6 +2,19 @@
#include <string.h>
#ifdef NDEBUG
/// If set to 1 - randomize memory mappings manually (address space layout randomization) to reproduce more memory stomping bugs.
/// Note that Linux doesn't do it by default. This may lead to worse TLB performance.
#define ALLOCATOR_ASLR 0
#else
#define ALLOCATOR_ASLR 1
#endif
#if ALLOCATOR_ASLR
#include <pcg_random.hpp>
#include <Common/randomSeed.h>
#endif
/** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena.
* Also used in hash tables.
@ -14,6 +27,12 @@
template <bool clear_memory_>
class Allocator
{
#if ALLOCATOR_ASLR
private:
pcg64 rng{randomSeed()};
#endif
void * mmap_hint();
protected:
static constexpr bool clear_memory = clear_memory_;

View File

@ -419,6 +419,7 @@ namespace ErrorCodes
extern const int BAD_DATABASE_FOR_TEMPORARY_TABLE = 442;
extern const int NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA = 443;
extern const int UNKNOWN_PROTOBUF_FORMAT = 444;
extern const int CANNOT_MPROTECT = 445;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -2,6 +2,7 @@
#include <string.h>
#include <cstddef>
#include <cassert>
#include <algorithm>
#include <memory>
@ -16,10 +17,19 @@
#include <Common/BitHelpers.h>
#include <Common/memcpySmall.h>
#ifndef NDEBUG
#include <sys/mman.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_MPROTECT;
}
inline constexpr size_t integerRoundUp(size_t value, size_t dividend)
{
return ((value + dividend - 1) / dividend) * dividend;
@ -107,6 +117,8 @@ protected:
if (c_start == null)
return;
unprotect();
TAllocator::free(c_start - pad_left, allocated_bytes());
}
@ -119,6 +131,8 @@ protected:
return;
}
unprotect();
ptrdiff_t end_diff = c_end - c_start;
c_start = reinterpret_cast<char *>(
@ -154,11 +168,34 @@ protected:
realloc(allocated_bytes() * 2, std::forward<TAllocatorParams>(allocator_params)...);
}
#ifndef NDEBUG
/// Make memory region readonly with mprotect if it is large enough.
/// The operation is slow and performed only for debug builds.
void protectImpl(int prot)
{
static constexpr size_t PAGE_SIZE = 4096;
char * left_rounded_up = reinterpret_cast<char *>((reinterpret_cast<intptr_t>(c_start) - pad_left + PAGE_SIZE - 1) / PAGE_SIZE * PAGE_SIZE);
char * right_rounded_down = reinterpret_cast<char *>((reinterpret_cast<intptr_t>(c_end_of_storage) + pad_right) / PAGE_SIZE * PAGE_SIZE);
if (right_rounded_down > left_rounded_up)
{
size_t length = right_rounded_down - left_rounded_up;
if (0 != mprotect(left_rounded_up, length, prot))
throwFromErrno("Cannot mprotect memory region", ErrorCodes::CANNOT_MPROTECT);
}
}
/// Restore memory protection in destructor or realloc for further reuse by allocator.
bool mprotected = false;
#endif
public:
bool empty() const { return c_end == c_start; }
size_t size() const { return (c_end - c_start) / ELEMENT_SIZE; }
size_t capacity() const { return (c_end_of_storage - c_start) / ELEMENT_SIZE; }
/// This method is safe to use only for information about memory usage.
size_t allocated_bytes() const { return c_end_of_storage - c_start + pad_right + pad_left; }
void clear() { c_end = c_start; }
@ -197,6 +234,23 @@ public:
c_end += byte_size(1);
}
void protect()
{
#ifndef NDEBUG
protectImpl(PROT_READ);
mprotected = true;
#endif
}
void unprotect()
{
#ifndef NDEBUG
if (mprotected)
protectImpl(PROT_WRITE);
mprotected = false;
#endif
}
~PODArrayBase()
{
dealloc();
@ -271,8 +325,18 @@ public:
const T * data() const { return t_start(); }
/// The index is signed to access -1th element without pointer overflow.
T & operator[] (ssize_t n) { return t_start()[n]; }
const T & operator[] (ssize_t n) const { return t_start()[n]; }
T & operator[] (ssize_t n)
{
/// <= size, because taking address of one element past memory range is Ok in C++ (expression like &arr[arr.size()] is perfectly valid).
assert((n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) && (n <= static_cast<ssize_t>(this->size())));
return t_start()[n];
}
const T & operator[] (ssize_t n) const
{
assert((n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) && (n <= static_cast<ssize_t>(this->size())));
return t_start()[n];
}
T & front() { return t_start()[0]; }
T & back() { return t_end()[-1]; }
@ -390,6 +454,11 @@ public:
void swap(PODArray & rhs)
{
#ifndef NDEBUG
this->unprotect();
rhs.unprotect();
#endif
/// Swap two PODArray objects, arr1 and arr2, that satisfy the following conditions:
/// - The elements of arr1 are stored on stack.
/// - The elements of arr2 are stored on heap.
@ -438,7 +507,9 @@ public:
};
if (!this->isInitialized() && !rhs.isInitialized())
{
return;
}
else if (!this->isInitialized() && rhs.isInitialized())
{
do_move(rhs, *this);
@ -482,9 +553,13 @@ public:
rhs.c_end = rhs.c_start + this->byte_size(lhs_size);
}
else if (this->isAllocatedFromStack() && !rhs.isAllocatedFromStack())
{
swap_stack_heap(*this, rhs);
}
else if (!this->isAllocatedFromStack() && rhs.isAllocatedFromStack())
{
swap_stack_heap(rhs, *this);
}
else
{
std::swap(this->c_start, rhs.c_start);

View File

@ -19,7 +19,6 @@ namespace ErrorCodes
{
extern const int UNKNOWN_CODEC;
extern const int UNEXPECTED_AST_STRUCTURE;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int DATA_TYPE_CANNOT_HAVE_ARGUMENTS;
}
@ -85,7 +84,7 @@ CompressionCodecPtr CompressionCodecFactory::get(const UInt8 byte_code) const
const auto family_code_and_creator = family_code_with_codec.find(byte_code);
if (family_code_and_creator == family_code_with_codec.end())
throw Exception("Unknown codec family code : " + toString(byte_code), ErrorCodes::UNKNOWN_CODEC);
throw Exception("Unknown codec family code: " + toString(byte_code), ErrorCodes::UNKNOWN_CODEC);
return family_code_and_creator->second({}, nullptr);
}

View File

@ -113,6 +113,7 @@ namespace Graphite
struct Pattern
{
std::shared_ptr<OptimizedRegularExpression> regexp;
std::string regexp_str;
AggregateFunctionPtr function;
Retentions retentions; /// Must be ordered by 'age' descending.
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
@ -124,6 +125,7 @@ namespace Graphite
struct Params
{
String config_name;
String path_column_name;
String time_column_name;
String value_column_name;
@ -215,6 +217,7 @@ private:
const Graphite::Pattern undef_pattern =
{ /// temporary empty pattern for selectPatternForPath
nullptr,
"",
nullptr,
DB::Graphite::Retentions(),
undef_pattern.TypeUndef,

View File

@ -6,6 +6,7 @@
#include <DataStreams/SizeLimits.h>
#include <IO/Progress.h>
#include <Interpreters/SettingsCommon.h>
#include <Storages/TableStructureLockHolder.h>
#include <atomic>
#include <shared_mutex>
@ -24,12 +25,9 @@ class IBlockInputStream;
class ProcessListElement;
class QuotaForIntervals;
class QueryStatus;
class TableStructureReadLock;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
/** Callback to track the progress of the query.
* Used in IBlockInputStream and Context.
@ -117,7 +115,7 @@ public:
size_t checkDepth(size_t max_depth) const { return checkDepthImpl(max_depth, max_depth); }
/// Do not allow to change the table while the blocks stream and its children are alive.
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
/// Get information about execution speed.
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
@ -244,7 +242,7 @@ public:
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
TableStructureReadLocks table_locks;
std::vector<TableStructureReadLockHolder> table_locks;
BlockInputStreams children;
std::shared_mutex children_mutex;

View File

@ -5,6 +5,7 @@
#include <memory>
#include <boost/noncopyable.hpp>
#include <Core/Block.h>
#include <Storages/TableStructureLockHolder.h>
namespace DB
@ -12,13 +13,6 @@ namespace DB
struct Progress;
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
struct Progress;
/** Interface of stream for writing data (into table, filesystem, network, terminal, etc.)
*/
class IBlockOutputStream : private boost::noncopyable
@ -64,10 +58,10 @@ public:
/** Don't let to alter table while instance of stream is alive.
*/
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
private:
TableStructureReadLocks table_locks;
std::vector<TableStructureReadLockHolder> table_locks;
};
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;

View File

@ -6,9 +6,14 @@
namespace DB
{
LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_)
LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, bool use_limit_as_total_rows_approx)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_)
{
if (use_limit_as_total_rows_approx)
{
addTotalRowsApprox(static_cast<size_t>(limit));
}
children.push_back(input);
}

View File

@ -16,8 +16,9 @@ public:
* returns an empty block, and this causes the query to be canceled.
* If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases:
* when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server.
* If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats
*/
LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_ = false);
LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false);
String getName() const override { return "Limit"; }

View File

@ -20,7 +20,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(storage->lockStructure(true, context.getCurrentQueryId()));
addTableLock(storage->lockStructureForShare(true, context.getCurrentQueryId()));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
@ -45,7 +45,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
auto & materialized_view = dynamic_cast<const StorageMaterializedView &>(*dependent_table);
if (StoragePtr inner_table = materialized_view.tryGetTargetTable())
addTableLock(inner_table->lockStructure(true, context.getCurrentQueryId()));
addTableLock(inner_table->lockStructureForShare(true, context.getCurrentQueryId()));
auto query = materialized_view.getInnerQuery();
BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>(

View File

@ -69,7 +69,7 @@ void DataTypeFixedString::deserializeBinary(IColumn & column, ReadBuffer & istr)
data.resize(old_size + n);
try
{
istr.readStrict(reinterpret_cast<char *>(&data[old_size]), n);
istr.readStrict(reinterpret_cast<char *>(data.data() + old_size), n);
}
catch (...)
{

View File

@ -130,9 +130,9 @@ static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnSt
if (size)
{
#ifdef __x86_64__
#ifdef __SSE2__
/// An optimistic branch in which more efficient copying is possible.
if (offset + 16 * UNROLL_TIMES <= data.allocated_bytes() && istr.position() + size + 16 * UNROLL_TIMES <= istr.buffer().end())
if (offset + 16 * UNROLL_TIMES <= data.capacity() && istr.position() + size + 16 * UNROLL_TIMES <= istr.buffer().end())
{
const __m128i * sse_src_pos = reinterpret_cast<const __m128i *>(istr.position());
const __m128i * sse_src_end = sse_src_pos + (size + (16 * UNROLL_TIMES - 1)) / 16 / UNROLL_TIMES * UNROLL_TIMES;
@ -140,22 +140,11 @@ static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnSt
while (sse_src_pos < sse_src_end)
{
/// NOTE gcc 4.9.2 unrolls the loop, but for some reason uses only one xmm register.
/// for (size_t j = 0; j < UNROLL_TIMES; ++j)
/// _mm_storeu_si128(sse_dst_pos + j, _mm_loadu_si128(sse_src_pos + j));
for (size_t j = 0; j < UNROLL_TIMES; ++j)
_mm_storeu_si128(sse_dst_pos + j, _mm_loadu_si128(sse_src_pos + j));
sse_src_pos += UNROLL_TIMES;
sse_dst_pos += UNROLL_TIMES;
if (UNROLL_TIMES >= 4) __asm__("movdqu %0, %%xmm0" :: "m"(sse_src_pos[-4]));
if (UNROLL_TIMES >= 3) __asm__("movdqu %0, %%xmm1" :: "m"(sse_src_pos[-3]));
if (UNROLL_TIMES >= 2) __asm__("movdqu %0, %%xmm2" :: "m"(sse_src_pos[-2]));
if (UNROLL_TIMES >= 1) __asm__("movdqu %0, %%xmm3" :: "m"(sse_src_pos[-1]));
if (UNROLL_TIMES >= 4) __asm__("movdqu %%xmm0, %0" : "=m"(sse_dst_pos[-4]));
if (UNROLL_TIMES >= 3) __asm__("movdqu %%xmm1, %0" : "=m"(sse_dst_pos[-3]));
if (UNROLL_TIMES >= 2) __asm__("movdqu %%xmm2, %0" : "=m"(sse_dst_pos[-2]));
if (UNROLL_TIMES >= 1) __asm__("movdqu %%xmm3, %0" : "=m"(sse_dst_pos[-1]));
}
istr.position() += size;

View File

@ -77,7 +77,7 @@ void RegionsNames::reload()
throw Poco::Exception("Logical error. Maybe size estimate of " + names_source->getSourceName() + " is wrong.");
new_chars.resize(old_size + name_entry.name.length() + 1);
memcpy(&new_chars[old_size], name_entry.name.c_str(), name_entry.name.length() + 1);
memcpy(new_chars.data() + old_size, name_entry.name.c_str(), name_entry.name.length() + 1);
if (name_entry.id > max_region_id)
{
@ -92,7 +92,7 @@ void RegionsNames::reload()
while (name_entry.id >= new_names_refs.size())
new_names_refs.resize(new_names_refs.size() * 2, StringRef("", 0));
new_names_refs[name_entry.id] = StringRef(&new_chars[old_size], name_entry.name.length());
new_names_refs[name_entry.id] = StringRef(new_chars.data() + old_size, name_entry.name.length());
}
chars[language_id].swap(new_chars);

View File

@ -59,7 +59,7 @@ namespace
{
size_t old_size = buf.size();
buf.reserve(old_size + MAX_VARINT_SIZE);
UInt8 * ptr = &buf[old_size];
UInt8 * ptr = buf.data() + old_size;
ptr = writeVarint(value, ptr);
buf.resize_assume_reserved(ptr - buf.data());
}
@ -200,7 +200,7 @@ void ProtobufWriter::SimpleWriter::writeUInt(UInt32 field_number, UInt64 value)
{
size_t old_size = buffer.size();
buffer.reserve(old_size + 2 * MAX_VARINT_SIZE);
UInt8 * ptr = &buffer[old_size];
UInt8 * ptr = buffer.data() + old_size;
ptr = writeFieldNumber(field_number, VARINT, ptr);
ptr = writeVarint(value, ptr);
buffer.resize_assume_reserved(ptr - buffer.data());
@ -223,7 +223,7 @@ void ProtobufWriter::SimpleWriter::writeFixed(UInt32 field_number, T value)
constexpr WireType wire_type = (sizeof(T) == 4) ? BITS32 : BITS64;
size_t old_size = buffer.size();
buffer.reserve(old_size + MAX_VARINT_SIZE + sizeof(T));
UInt8 * ptr = &buffer[old_size];
UInt8 * ptr = buffer.data() + old_size;
ptr = writeFieldNumber(field_number, wire_type, ptr);
memcpy(ptr, &value, sizeof(T));
ptr += sizeof(T);
@ -234,7 +234,7 @@ void ProtobufWriter::SimpleWriter::writeString(UInt32 field_number, const String
{
size_t old_size = buffer.size();
buffer.reserve(old_size + 2 * MAX_VARINT_SIZE + str.size);
UInt8 * ptr = &buffer[old_size];
UInt8 * ptr = buffer.data() + old_size;
ptr = writeFieldNumber(field_number, LENGTH_DELIMITED, ptr);
ptr = writeVarint(str.size, ptr);
memcpy(ptr, str.data, str.size);
@ -294,7 +294,7 @@ void ProtobufWriter::SimpleWriter::addFixedToRepeatedPack(T value)
static_assert((sizeof(T) == 4) || (sizeof(T) == 8));
size_t old_size = buffer.size();
buffer.resize(old_size + sizeof(T));
memcpy(&buffer[old_size], &value, sizeof(T));
memcpy(buffer.data() + old_size, &value, sizeof(T));
}

View File

@ -65,7 +65,7 @@ FunctionBasePtr FunctionBuilderJoinGet::buildImpl(const ColumnsWithTypeAndName &
auto join = storage_join->getJoin();
DataTypes data_types(arguments.size());
auto table_lock = storage_join->lockStructure(false, context.getCurrentQueryId());
auto table_lock = storage_join->lockStructureForShare(false, context.getCurrentQueryId());
for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;

View File

@ -1,4 +1,5 @@
#include <Functions/IFunction.h>
#include <Storages/TableStructureLockHolder.h>
namespace DB
{
@ -7,8 +8,6 @@ class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
class Join;
using JoinPtr = std::shared_ptr<Join>;
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
class FunctionJoinGet final : public IFunction, public std::enable_shared_from_this<FunctionJoinGet>
{
@ -16,7 +15,7 @@ public:
static constexpr auto name = "joinGet";
FunctionJoinGet(
TableStructureReadLockPtr table_lock, StoragePtr storage_join, JoinPtr join, const String & attr_name, DataTypePtr return_type)
TableStructureReadLockHolder table_lock, StoragePtr storage_join, JoinPtr join, const String & attr_name, DataTypePtr return_type)
: table_lock(std::move(table_lock))
, storage_join(std::move(storage_join))
, join(std::move(join))
@ -36,7 +35,7 @@ private:
size_t getNumberOfArguments() const override { return 0; }
private:
TableStructureReadLockPtr table_lock;
TableStructureReadLockHolder table_lock;
StoragePtr storage_join;
JoinPtr join;
const String attr_name;

View File

@ -8,164 +8,271 @@
#include <Core/Defines.h>
#include <common/unaligned.h>
#include <algorithm>
#include <climits>
#include <cstring>
#include <limits>
#include <memory>
#include <utility>
#ifdef __SSE4_2__
#include <nmmintrin.h>
# include <nmmintrin.h>
#endif
namespace DB
{
/** Distance function implementation.
* We calculate all the trigrams from left string and count by the index of
* We calculate all the n-grams from left string and count by the index of
* 16 bits hash of them in the map.
* Then calculate all the trigrams from the right string and calculate
* the trigram distance on the flight by adding and subtracting from the hashmap.
* Then calculate all the n-grams from the right string and calculate
* the n-gram distance on the flight by adding and subtracting from the hashmap.
* Then return the map into the condition of which it was after the left string
* calculation. If the right string size is big (more than 2**15 bytes),
* the strings are not similar at all and we return 1.
*/
struct TrigramDistanceImpl
template <size_t N, class CodePoint, bool UTF8, bool CaseInsensitive>
struct NgramDistanceImpl
{
using ResultType = Float32;
using CodePoint = UInt32;
/// map_size for trigram difference
/// map_size for ngram difference.
static constexpr size_t map_size = 1u << 16;
/// If the haystack size is bigger than this, behaviour is unspecified for this function
/// If the haystack size is bigger than this, behaviour is unspecified for this function.
static constexpr size_t max_string_size = 1u << 15;
/// Default padding to read safely.
static constexpr size_t default_padding = 16;
/// Max codepoints to store at once. 16 is for batching usage and PODArray has this padding.
static constexpr size_t simultaneously_codepoints_num = default_padding + N - 1;
/** This fits mostly in L2 cache all the time.
* Actually use UInt16 as addings and subtractions do not UB overflow. But think of it as a signed
* integer array.
*/
using TrigramStats = UInt16[map_size];
using NgramStats = UInt16[map_size];
static ALWAYS_INLINE UInt16 trigramHash(CodePoint one, CodePoint two, CodePoint three)
static ALWAYS_INLINE UInt16 ASCIIHash(const CodePoint * code_points)
{
UInt64 combined = (static_cast<UInt64>(one) << 32) | two;
return intHashCRC32(unalignedLoad<UInt32>(code_points)) & 0xFFFFu;
}
static ALWAYS_INLINE UInt16 UTF8Hash(const CodePoint * code_points)
{
UInt64 combined = (static_cast<UInt64>(code_points[0]) << 32) | code_points[1];
#ifdef __SSE4_2__
return _mm_crc32_u64(three, combined) & 0xFFFFu;
return _mm_crc32_u64(code_points[2], combined) & 0xFFFFu;
#else
return (intHashCRC32(combined) ^ intHashCRC32(three)) & 0xFFFFu;
return (intHashCRC32(combined) ^ intHashCRC32(code_points[2])) & 0xFFFFu;
#endif
}
static ALWAYS_INLINE CodePoint readCodePoint(const char *& pos, const char * end) noexcept
template <size_t Offset, class Container, size_t... I>
static ALWAYS_INLINE inline void unrollLowering(Container & cont, const std::index_sequence<I...> &)
{
size_t length = UTF8::seqLength(*pos);
if (pos + length > end)
length = end - pos;
CodePoint res;
/// This is faster than just memcpy because of compiler optimizations with moving bytes.
switch (length)
{
case 1:
res = 0;
memcpy(&res, pos, 1);
break;
case 2:
res = 0;
memcpy(&res, pos, 2);
break;
case 3:
res = 0;
memcpy(&res, pos, 3);
break;
default:
memcpy(&res, pos, 4);
}
pos += length;
return res;
((cont[Offset + I] = std::tolower(cont[Offset + I])), ...);
}
static inline size_t calculateNeedleStats(const char * data, const size_t size, TrigramStats & trigram_stats) noexcept
static ALWAYS_INLINE size_t readASCIICodePoints(CodePoint * code_points, const char *& pos, const char * end)
{
size_t len = 0;
const char * start = data;
const char * end = data + size;
CodePoint cp1 = 0;
CodePoint cp2 = 0;
CodePoint cp3 = 0;
/// Offset before which we copy some data.
constexpr size_t padding_offset = default_padding - N + 1;
/// We have an array like this for ASCII (N == 4, other cases are similar)
/// |a0|a1|a2|a3|a4|a5|a6|a7|a8|a9|a10|a11|a12|a13|a14|a15|a16|a17|a18|
/// And we copy ^^^^^^^^^^^^^^^ these bytes to the start
/// Actually it is enough to copy 3 bytes, but memcpy for 4 bytes translates into 1 instruction
memcpy(code_points, code_points + padding_offset, roundUpToPowerOfTwoOrZero(N - 1) * sizeof(CodePoint));
/// Now we have an array
/// |a13|a14|a15|a16|a4|a5|a6|a7|a8|a9|a10|a11|a12|a13|a14|a15|a16|a17|a18|
/// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/// Doing unaligned read of 16 bytes and copy them like above
/// 16 is also chosen to do two `movups`.
/// Such copying allow us to have 3 codepoints from the previous read to produce the 4-grams with them.
memcpy(code_points + (N - 1), pos, default_padding * sizeof(CodePoint));
while (start != end)
if constexpr (CaseInsensitive)
{
cp1 = cp2;
cp2 = cp3;
cp3 = readCodePoint(start, end);
++len;
if (len < 3)
continue;
++trigram_stats[trigramHash(cp1, cp2, cp3)];
/// We really need template lambdas with C++20 to do it inline
unrollLowering<N - 1>(code_points, std::make_index_sequence<padding_offset>());
}
return std::max(static_cast<Int64>(0), static_cast<Int64>(len) - 2);
pos += padding_offset;
if (pos > end)
return default_padding - (pos - end);
return default_padding;
}
static inline UInt64 calculateHaystackStatsAndMetric(const char * data, const size_t size, TrigramStats & trigram_stats, size_t & distance)
static ALWAYS_INLINE size_t readUTF8CodePoints(CodePoint * code_points, const char *& pos, const char * end)
{
size_t len = 0;
size_t trigram_cnt = 0;
/// The same copying as described in the function above.
memcpy(code_points, code_points + default_padding - N + 1, roundUpToPowerOfTwoOrZero(N - 1) * sizeof(CodePoint));
size_t num = N - 1;
while (num < default_padding && pos < end)
{
size_t length = UTF8::seqLength(*pos);
if (pos + length > end)
length = end - pos;
CodePoint res;
/// This is faster than just memcpy because of compiler optimizations with moving bytes.
switch (length)
{
case 1:
res = 0;
memcpy(&res, pos, 1);
break;
case 2:
res = 0;
memcpy(&res, pos, 2);
break;
case 3:
res = 0;
memcpy(&res, pos, 3);
break;
default:
memcpy(&res, pos, 4);
}
/// This is not a really true case insensitive utf8. We zero the 5-th bit of every byte.
/// For ASCII it works https://catonmat.net/ascii-case-conversion-trick. For most cyrrilic letters also does.
/// For others, we don't care now. Lowering UTF is not a cheap operation.
if constexpr (CaseInsensitive)
{
switch (length)
{
case 4:
res &= ~(1u << (5 + 3 * CHAR_BIT));
[[fallthrough]];
case 3:
res &= ~(1u << (5 + 2 * CHAR_BIT));
[[fallthrough]];
case 2:
res &= ~(1u << (5 + CHAR_BIT));
[[fallthrough]];
default:
res &= ~(1u << 5);
}
}
pos += length;
code_points[num++] = res;
}
return num;
}
static ALWAYS_INLINE inline size_t calculateNeedleStats(
const char * data,
const size_t size,
NgramStats & ngram_stats,
size_t (*read_code_points)(CodePoint *, const char *&, const char *),
UInt16 (*hash_functor)(const CodePoint *))
{
// To prevent size_t overflow below.
if (size < N)
return 0;
const char * start = data;
const char * end = data + size;
CodePoint cp1 = 0;
CodePoint cp2 = 0;
CodePoint cp3 = 0;
CodePoint cp[simultaneously_codepoints_num] = {};
/// read_code_points returns the position of cp where it stopped reading codepoints.
size_t found = read_code_points(cp, start, end);
/// We need to start for the first time here, because first N - 1 codepoints mean nothing.
size_t i = N - 1;
/// Initialize with this value because for the first time `found` does not initialize first N - 1 codepoints.
size_t len = -N + 1;
do
{
len += found - N + 1;
for (; i + N <= found; ++i)
++ngram_stats[hash_functor(cp + i)];
i = 0;
} while (start < end && (found = read_code_points(cp, start, end)));
return len;
}
static ALWAYS_INLINE inline UInt64 calculateHaystackStatsAndMetric(
const char * data,
const size_t size,
NgramStats & ngram_stats,
size_t & distance,
size_t (*read_code_points)(CodePoint *, const char *&, const char *),
UInt16 (*hash_functor)(const CodePoint *))
{
size_t ngram_cnt = 0;
const char * start = data;
const char * end = data + size;
CodePoint cp[simultaneously_codepoints_num] = {};
/// allocation tricks, most strings are relatively small
static constexpr size_t small_buffer_size = 256;
std::unique_ptr<UInt16[]> big_buffer;
UInt16 small_buffer[small_buffer_size];
UInt16 * trigram_storage = small_buffer;
UInt16 * ngram_storage = small_buffer;
if (size > small_buffer_size)
{
trigram_storage = new UInt16[size];
big_buffer.reset(trigram_storage);
ngram_storage = new UInt16[size];
big_buffer.reset(ngram_storage);
}
while (start != end)
/// read_code_points returns the position of cp where it stopped reading codepoints.
size_t found = read_code_points(cp, start, end);
/// We need to start for the first time here, because first N - 1 codepoints mean nothing.
size_t iter = N - 1;
do
{
cp1 = cp2;
cp2 = cp3;
cp3 = readCodePoint(start, end);
++len;
if (len < 3)
continue;
for (; iter + N <= found; ++iter)
{
UInt16 hash = hash_functor(cp + iter);
if (static_cast<Int16>(ngram_stats[hash]) > 0)
--distance;
else
++distance;
UInt16 hash = trigramHash(cp1, cp2, cp3);
if (static_cast<Int16>(trigram_stats[hash]) > 0)
--distance;
else
++distance;
trigram_storage[trigram_cnt++] = hash;
--trigram_stats[hash];
}
ngram_storage[ngram_cnt++] = hash;
--ngram_stats[hash];
}
iter = 0;
} while (start < end && (found = read_code_points(cp, start, end)));
/// Return the state of hash map to its initial.
for (size_t i = 0; i < trigram_cnt; ++i)
++trigram_stats[trigram_storage[i]];
return trigram_cnt;
for (size_t i = 0; i < ngram_cnt; ++i)
++ngram_stats[ngram_storage[i]];
return ngram_cnt;
}
static void constant_constant(const std::string & data, const std::string & needle, Float32 & res)
template <class Callback, class... Args>
static inline size_t dispatchSearcher(Callback callback, Args &&... args)
{
TrigramStats common_stats;
if constexpr (!UTF8)
return callback(std::forward<Args>(args)..., readASCIICodePoints, ASCIIHash);
else
return callback(std::forward<Args>(args)..., readUTF8CodePoints, UTF8Hash);
}
static void constant_constant(std::string data, std::string needle, Float32 & res)
{
NgramStats common_stats;
memset(common_stats, 0, sizeof(common_stats));
size_t second_size = calculateNeedleStats(needle.data(), needle.size(), common_stats);
/// We use unsafe versions of getting ngrams, so I decided to use padded strings.
const size_t needle_size = needle.size();
const size_t data_size = data.size();
needle.resize(needle_size + default_padding);
data.resize(data_size + default_padding);
size_t second_size = dispatchSearcher(calculateNeedleStats, needle.data(), needle_size, common_stats);
size_t distance = second_size;
if (data.size() <= max_string_size)
if (data_size <= max_string_size)
{
size_t first_size = calculateHaystackStatsAndMetric(data.data(), data.size(), common_stats, distance);
size_t first_size = dispatchSearcher(calculateHaystackStatsAndMetric, data.data(), data_size, common_stats, distance);
res = distance * 1.f / std::max(first_size + second_size, size_t(1));
}
else
@ -175,11 +282,18 @@ struct TrigramDistanceImpl
}
static void vector_constant(
const ColumnString::Chars & data, const ColumnString::Offsets & offsets, const std::string & needle, PaddedPODArray<Float32> & res)
const ColumnString::Chars & data, const ColumnString::Offsets & offsets, std::string needle, PaddedPODArray<Float32> & res)
{
TrigramStats common_stats;
/// zeroing our map
NgramStats common_stats;
memset(common_stats, 0, sizeof(common_stats));
const size_t needle_stats_size = calculateNeedleStats(needle.data(), needle.size(), common_stats);
/// We use unsafe versions of getting ngrams, so I decided to use padded_data even in needle case.
const size_t needle_size = needle.size();
needle.resize(needle_size + default_padding);
const size_t needle_stats_size = dispatchSearcher(calculateNeedleStats, needle.data(), needle_size, common_stats);
size_t distance = needle_stats_size;
size_t prev_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
@ -188,12 +302,13 @@ struct TrigramDistanceImpl
const size_t haystack_size = offsets[i] - prev_offset - 1;
if (haystack_size <= max_string_size)
{
size_t haystack_stats_size
= calculateHaystackStatsAndMetric(reinterpret_cast<const char *>(haystack), haystack_size, common_stats, distance);
size_t haystack_stats_size = dispatchSearcher(
calculateHaystackStatsAndMetric, reinterpret_cast<const char *>(haystack), haystack_size, common_stats, distance);
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, size_t(1));
}
else
{
/// if the strings are too big, we say they are completely not the same
res[i] = 1.f;
}
distance = needle_stats_size;
@ -203,16 +318,39 @@ struct TrigramDistanceImpl
};
struct TrigramDistanceName
struct NgramDistanceName
{
static constexpr auto name = "trigramDistance";
static constexpr auto name = "ngramDistance";
};
using FunctionTrigramsDistance = FunctionsStringSimilarity<TrigramDistanceImpl, TrigramDistanceName>;
struct NgramDistanceCaseInsensitiveName
{
static constexpr auto name = "ngramDistanceCaseInsensitive";
};
struct NgramDistanceUTF8Name
{
static constexpr auto name = "ngramDistanceUTF8";
};
struct NgramDistanceUTF8CaseInsensitiveName
{
static constexpr auto name = "ngramDistanceCaseInsensitiveUTF8";
};
using FunctionNgramDistance = FunctionsStringSimilarity<NgramDistanceImpl<4, UInt8, false, false>, NgramDistanceName>;
using FunctionNgramDistanceCaseInsensitive
= FunctionsStringSimilarity<NgramDistanceImpl<4, UInt8, false, true>, NgramDistanceCaseInsensitiveName>;
using FunctionNgramDistanceUTF8 = FunctionsStringSimilarity<NgramDistanceImpl<3, UInt32, true, false>, NgramDistanceUTF8Name>;
using FunctionNgramDistanceCaseInsensitiveUTF8
= FunctionsStringSimilarity<NgramDistanceImpl<3, UInt32, true, true>, NgramDistanceUTF8CaseInsensitiveName>;
void registerFunctionsStringSimilarity(FunctionFactory & factory)
{
factory.registerFunction<FunctionTrigramsDistance>();
factory.registerFunction<FunctionNgramDistance>();
factory.registerFunction<FunctionNgramDistanceCaseInsensitive>();
factory.registerFunction<FunctionNgramDistanceUTF8>();
factory.registerFunction<FunctionNgramDistanceCaseInsensitiveUTF8>();
}
}

View File

@ -12,8 +12,9 @@ namespace DB
/** Calculate similarity metrics:
*
* trigramDistance(haystack, needle) --- calculate so called 3-gram distance between haystack and needle.
* ngramDistance(haystack, needle) --- calculate n-gram distance between haystack and needle.
* Returns float number from 0 to 1 - the closer to zero, the more strings are similar to each other.
* Also support CaseInsensitive and UTF8 formats.
*/
namespace ErrorCodes

View File

@ -54,7 +54,7 @@ public:
array_column = checkAndGetColumn<ColumnArray>(temp_column.get());
}
block.getByPosition(result).column
= ColumnArray::create(first_column->replicate(array_column->getOffsets()), array_column->getOffsetsPtr());
= ColumnArray::create(first_column->replicate(array_column->getOffsets())->convertToFullColumnIfConst(), array_column->getOffsetsPtr());
}
};

View File

@ -55,14 +55,11 @@ struct Progress
/// Each value separately is changed atomically (but not whole object).
bool incrementPiecewiseAtomically(const Progress & rhs)
{
if (!rhs.rows)
return false;
rows += rhs.rows;
bytes += rhs.bytes;
total_rows += rhs.total_rows;
return true;
return rhs.rows ? true : false;
}
void reset()

View File

@ -34,7 +34,7 @@ private:
size_t old_size = vector.size();
vector.resize(old_size * 2);
internal_buffer = Buffer(reinterpret_cast<Position>(&vector[old_size]), reinterpret_cast<Position>(vector.data() + vector.size()));
internal_buffer = Buffer(reinterpret_cast<Position>(vector.data() + old_size), reinterpret_cast<Position>(vector.data() + vector.size()));
working_buffer = internal_buffer;
}

View File

@ -102,23 +102,23 @@ static inline T ALWAYS_INLINE packFixed(
switch (key_sizes[j])
{
case 1:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin() + index, 1);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<1>() + index, 1);
offset += 1;
break;
case 2:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin() + index * 2, 2);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<2>() + index * 2, 2);
offset += 2;
break;
case 4:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin() + index * 4, 4);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<4>() + index * 4, 4);
offset += 4;
break;
case 8:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin() + index * 8, 8);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<8>() + index * 8, 8);
offset += 8;
break;
default:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin() + index * key_sizes[j], key_sizes[j]);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<1>() + index * key_sizes[j], key_sizes[j]);
offset += key_sizes[j];
}
}
@ -168,23 +168,23 @@ static inline T ALWAYS_INLINE packFixed(
switch (key_sizes[j])
{
case 1:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin() + i, 1);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<1>() + i, 1);
offset += 1;
break;
case 2:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin() + i * 2, 2);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<2>() + i * 2, 2);
offset += 2;
break;
case 4:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin() + i * 4, 4);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<4>() + i * 4, 4);
offset += 4;
break;
case 8:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin() + i * 8, 8);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<8>() + i * 8, 8);
offset += 8;
break;
default:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin() + i * key_sizes[j], key_sizes[j]);
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<1>() + i * key_sizes[j], key_sizes[j]);
offset += key_sizes[j];
}
}

View File

@ -74,8 +74,9 @@ BlockIO InterpreterAlterQuery::execute()
if (!alter_commands.empty())
{
auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());
alter_commands.validate(*table, context);
table->alter(alter_commands, database_name, table_name, context);
table->alter(alter_commands, database_name, table_name, context, table_lock_holder);
}
return {};

View File

@ -587,11 +587,11 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
String as_table_name = create.as_table;
StoragePtr as_storage;
TableStructureReadLockPtr as_storage_lock;
TableStructureReadLockHolder as_storage_lock;
if (!as_table_name.empty())
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructure(false, context.getCurrentQueryId());
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
}
/// Set and retrieve list of columns.

View File

@ -93,7 +93,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = context.getTable(database_name, table_name);
}
auto table_lock = table->lockStructure(false, context.getCurrentQueryId());
auto table_lock = table->lockStructureForShare(false, context.getCurrentQueryId());
columns = table->getColumns().getAll();
column_defaults = table->getColumns().defaults;
column_comments = table->getColumns().comments;

View File

@ -69,7 +69,7 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
{
database_and_table.second->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = database_and_table.second->lockForAlter(context.getCurrentQueryId());
auto table_lock = database_and_table.second->lockExclusively(context.getCurrentQueryId());
/// Drop table from memory, don't touch data and metadata
database_and_table.first->detachTable(database_and_table.second->getTableName());
}
@ -78,7 +78,7 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
database_and_table.second->checkTableCanBeDropped();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = database_and_table.second->lockForAlter(context.getCurrentQueryId());
auto table_lock = database_and_table.second->lockExclusively(context.getCurrentQueryId());
/// Drop table data, don't touch metadata
database_and_table.second->truncate(query_ptr, context);
}
@ -89,7 +89,7 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
database_and_table.second->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = database_and_table.second->lockForAlter(context.getCurrentQueryId());
auto table_lock = database_and_table.second->lockExclusively(context.getCurrentQueryId());
/// Delete table metadata and table itself from memory
database_and_table.first->removeTable(context, database_and_table.second->getTableName());
@ -126,7 +126,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(String & table_name, ASTDr
if (kind == ASTDropQuery::Kind::Truncate)
{
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockForAlter(context.getCurrentQueryId());
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
/// Drop table data, don't touch metadata
table->truncate(query_ptr, context);
}
@ -135,7 +135,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(String & table_name, ASTDr
context_handle.tryRemoveExternalTable(table_name);
table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockForAlter(context.getCurrentQueryId());
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
/// Delete table data
table->drop();
table->is_dropped = true;

View File

@ -96,7 +96,7 @@ BlockIO InterpreterInsertQuery::execute()
checkAccess(query);
StoragePtr table = getTable(query);
auto table_lock = table->lockStructure(true, context.getCurrentQueryId());
auto table_lock = table->lockStructureForShare(true, context.getCurrentQueryId());
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;

View File

@ -23,7 +23,7 @@ BlockIO InterpreterOptimizeQuery::execute()
return executeDDLQueryOnCluster(query_ptr, context, {ast.database});
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true, context.getCurrentQueryId());
auto table_lock = table->lockStructureForShare(true, context.getCurrentQueryId());
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);
return {};
}

View File

@ -96,12 +96,12 @@ BlockIO InterpreterRenameQuery::execute()
table_guards.emplace(to, context.getDDLGuard(to.database_name, to.table_name));
}
std::vector<TableFullWriteLock> locks;
std::vector<TableStructureWriteLockHolder> locks;
locks.reserve(unique_tables_from.size());
for (const auto & names : unique_tables_from)
if (auto table = context.tryGetTable(names.database_name, names.table_name))
locks.emplace_back(table->lockForAlter(context.getCurrentQueryId()));
locks.emplace_back(table->lockExclusively(context.getCurrentQueryId()));
/** All tables are locked. If there are more than one rename in chain,
* we need to hold global lock while doing all renames. Order matters to avoid deadlocks.

View File

@ -200,7 +200,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
if (storage)
table_lock = storage->lockStructure(false, context.getCurrentQueryId());
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
syntax_analyzer_result = SyntaxAnalyzer(context, subquery_depth).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage);

View File

@ -248,7 +248,7 @@ private:
/// Table from where to read data, if not subquery.
StoragePtr storage;
TableStructureReadLockPtr table_lock;
TableStructureReadLockHolder table_lock;
/// Used when we read from prepared input, not table or subquery.
BlockInputStreamPtr input;

View File

@ -239,7 +239,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_nam
table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockForAlter(context.getCurrentQueryId());
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
create_ast = system_context.getCreateTableQuery(database_name, table_name);
database->detachTable(table_name);

View File

@ -5,16 +5,7 @@
namespace DB
{
TableStructureReadLock::TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data, const String & query_id)
: storage(storage_)
{
if (lock_data)
data_lock = storage->data_lock->getLock(RWLockImpl::Read, query_id);
if (lock_structure)
structure_lock = storage->structure_lock->getLock(RWLockImpl::Read, query_id);
}
void IStorage::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
void IStorage::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
for (const auto & param : params)
{
@ -22,7 +13,7 @@ void IStorage::alter(const AlterCommands & params, const String & database_name,
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
auto lock = lockStructureForAlter(context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();
params.apply(new_columns);

View File

@ -6,6 +6,7 @@
#include <Core/QueryProcessingStage.h>
#include <Databases/IDatabase.h>
#include <Storages/ITableDeclaration.h>
#include <Storages/TableStructureLockHolder.h>
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/CancellationCode.h>
#include <shared_mutex>
@ -47,35 +48,6 @@ class MutationCommands;
class PartitionCommands;
/** Does not allow changing the table description (including rename and delete the table).
* If during any operation the table structure should remain unchanged, you need to hold such a lock for all of its time.
* For example, you need to hold such a lock for the duration of the entire SELECT or INSERT query and for the whole time the merge of the set of parts
* (but between the selection of parts for the merge and their merging, the table structure can change).
* NOTE: This is a lock to "read" the table's description. To change the table description, you need to take the TableStructureWriteLock.
*/
class TableStructureReadLock
{
private:
friend class IStorage;
StoragePtr storage;
/// Order is important.
RWLockImpl::LockHolder data_lock;
RWLockImpl::LockHolder structure_lock;
public:
TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data, const String & query_id);
};
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
using TableStructureWriteLock = RWLockImpl::LockHolder;
using TableDataWriteLock = RWLockImpl::LockHolder;
using TableFullWriteLock = std::pair<TableDataWriteLock, TableStructureWriteLock>;
/** Storage. Responsible for
* - storage of the table data;
* - the definition in which files (or not in files) the data is stored;
@ -111,50 +83,72 @@ public:
/** Returns true if the storage supports deduplication of inserted data blocks . */
virtual bool supportsDeduplication() const { return false; }
/** Does not allow you to change the structure or name of the table.
* If you change the data in the table, you will need to specify will_modify_data = true.
* This will take an extra lock that does not allow starting ALTER MODIFY.
*
* WARNING: You need to call methods from ITableDeclaration under such a lock. Without it, they are not thread safe.
* WARNING: To avoid deadlocks, this method must not be called under lock of Context.
*/
TableStructureReadLockPtr lockStructure(bool will_modify_data, const String & query_id)
/// Acquire this lock if you need the table structure to remain constant during the execution of
/// the query. If will_add_new_data is true, this means that the query will add new data to the table
/// (INSERT or a parts merge).
TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id)
{
TableStructureReadLockPtr res = std::make_shared<TableStructureReadLock>(shared_from_this(), true, will_modify_data, query_id);
TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res;
return result;
}
/** Does not allow reading the table structure. It is taken for ALTER, RENAME and DROP, TRUNCATE.
*/
TableFullWriteLock lockForAlter(const String & query_id)
/// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you
/// can modify the table structure. It can later be upgraded to the exclusive lock.
TableStructureWriteLockHolder lockAlterIntention(const String & query_id)
{
/// The calculation order is important.
auto res_data_lock = lockDataForAlter(query_id);
auto res_structure_lock = lockStructureForAlter(query_id);
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
return {std::move(res_data_lock), std::move(res_structure_lock)};
}
/** Does not allow changing the data in the table. (Moreover, does not give a look at the structure of the table with the intention to change the data).
* It is taken during write temporary data in ALTER MODIFY.
* Under this lock, you can take lockStructureForAlter() to change the structure of the table.
*/
TableDataWriteLock lockDataForAlter(const String & query_id)
{
auto res = data_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res;
return result;
}
TableStructureWriteLock lockStructureForAlter(const String & query_id)
/// Upgrade alter intention lock and make sure that no new data is inserted into the table.
/// This is used by the ALTER MODIFY of the MergeTree storage to consistently determine
/// the set of parts that needs to be altered.
void lockNewDataStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{
auto res = structure_lock->getLock(RWLockImpl::Write, query_id);
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getTableName() + " was not taken. This is a bug.",
ErrorCodes::LOGICAL_ERROR);
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
}
/// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries
/// to ensure that no other query uses the table structure and it can be safely changed.
void lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getTableName() + " was not taken. This is a bug.",
ErrorCodes::LOGICAL_ERROR);
if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
}
/// Acquire the full exclusive lock immediately. No other queries can run concurrently.
TableStructureWriteLockHolder lockExclusively(const String & query_id)
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res;
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
return result;
}
/** Returns stage to which query is going to be processed in read() function.
@ -233,7 +227,7 @@ public:
* This method must fully execute the ALTER query, taking care of the locks itself.
* To update the table metadata on disk, this method should call InterpreterAlterQuery::updateMetadata.
*/
virtual void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context);
virtual void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureWriteLockHolder & table_lock_holder);
/** ALTER tables with regard to its partitions.
* Should handle locks for each command on its own.
@ -345,28 +339,20 @@ public:
using std::enable_shared_from_this<IStorage>::shared_from_this;
private:
friend class TableStructureReadLock;
/// You always need to take the next three locks in this order.
/// You always need to take the next two locks in this order.
/// If you hold this lock exclusively, you can be sure that no other structure modifying queries
/// (e.g. ALTER, DROP) are concurrently executing. But queries that only read table structure
/// (e.g. SELECT, INSERT) can continue to execute.
mutable RWLock alter_intention_lock = RWLockImpl::create();
/** It is taken for read for the entire INSERT query and the entire merge of the parts (for MergeTree).
* It is taken for write for the entire time ALTER MODIFY.
*
* Formally:
* Taking a write lock ensures that:
* 1) the data in the table will not change while the lock is alive,
* 2) all changes to the data after releasing the lock will be based on the structure of the table at the time after the lock was released.
* You need to take for read for the entire time of the operation that changes the data.
*/
mutable RWLock data_lock = RWLockImpl::create();
/// It is taken for share for the entire INSERT query and the entire merge of the parts (for MergeTree).
/// ALTER queries acquire an exclusive lock to ensure
mutable RWLock new_data_structure_lock = RWLockImpl::create();
/** Lock for multiple columns and path to table. It is taken for write at RENAME, ALTER (for ALTER MODIFY for a while) and DROP.
* It is taken for read for the whole time of SELECT, INSERT and merge parts (for MergeTree).
*
* Taking this lock for writing is a strictly "stronger" operation than taking parts_writing_lock for write record.
* That is, if this lock is taken for write, you should not worry about `parts_writing_lock`.
* parts_writing_lock is only needed for cases when you do not want to take `table_structure_lock` for long operations (ALTER MODIFY).
*/
/// Lock for the table column structure (names, types, etc.) and data path.
/// It is taken in exclusive mode by queries that modify them (e.g. RENAME, ALTER and DROP)
/// and in share mode by other queries.
mutable RWLock structure_lock = RWLockImpl::create();
};

View File

@ -79,7 +79,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
try
{
auto storage_lock = owned_storage->lockStructure(false, RWLockImpl::NO_QUERY);
auto storage_lock = owned_storage->lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::DataPartPtr part = findPart(part_name);

View File

@ -513,13 +513,16 @@ void MergeTreeDataPart::loadIndex()
for (size_t i = 0; i < marks_count; ++i) //-V756
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_data_types[j]->deserializeBinary(*loaded_index[j].get(), index_file);
storage.primary_key_data_types[j]->deserializeBinary(*loaded_index[j], index_file);
for (size_t i = 0; i < key_size; ++i)
{
loaded_index[i]->protect();
if (loaded_index[i]->size() != marks_count)
throw Exception("Cannot read all data from index file " + index_path
+ "(expected size: " + toString(marks_count) + ", read: " + toString(loaded_index[i]->size()) + ")",
ErrorCodes::CANNOT_READ_ALL_DATA);
}
if (!index_file.eof())
throw Exception("Index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);

View File

@ -368,14 +368,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
* It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals.
*/
bool use_sampling = relative_sample_size > 0 || settings.parallel_replicas_count > 1;
bool use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling());
bool no_data = false; /// There is nothing left after sampling.
if (use_sampling)
{
if (!data.supportsSampling())
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
if (sample_factor_column_queried && relative_sample_size != RelativeSize(0))
used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);

View File

@ -34,6 +34,19 @@ struct IMergeTreeIndexGranule
virtual void deserializeBinary(ReadBuffer & istr) = 0;
virtual bool empty() const = 0;
};
using MergeTreeIndexGranulePtr = std::shared_ptr<IMergeTreeIndexGranule>;
using MergeTreeIndexGranules = std::vector<MergeTreeIndexGranulePtr>;
/// Aggregates info about a single block of data.
struct IMergeTreeIndexAggregator
{
virtual ~IMergeTreeIndexAggregator() = default;
virtual bool empty() const = 0;
virtual MergeTreeIndexGranulePtr getGranuleAndReset() = 0;
/// Updates the stored info using rows of the specified block.
/// Reads no more than `limit` rows.
@ -41,8 +54,8 @@ struct IMergeTreeIndexGranule
virtual void update(const Block & block, size_t * pos, size_t limit) = 0;
};
using MergeTreeIndexGranulePtr = std::shared_ptr<IMergeTreeIndexGranule>;
using MergeTreeIndexGranules = std::vector<MergeTreeIndexGranulePtr>;
using MergeTreeIndexAggregatorPtr = std::shared_ptr<IMergeTreeIndexAggregator>;
using MergeTreeIndexAggregators = std::vector<MergeTreeIndexAggregatorPtr>;
/// Condition on the index.
@ -86,6 +99,7 @@ public:
virtual bool mayBenefitFromIndexForIn(const ASTPtr & node) const = 0;
virtual MergeTreeIndexGranulePtr createIndexGranule() const = 0;
virtual MergeTreeIndexAggregatorPtr createIndexAggregator() const = 0;
virtual IndexConditionPtr createIndexCondition(
const SelectQueryInfo & query_info, const Context & context) const = 0;

View File

@ -17,9 +17,11 @@ namespace ErrorCodes
MergeTreeMinMaxGranule::MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index)
: IMergeTreeIndexGranule(), index(index), parallelogram()
{
}
: IMergeTreeIndexGranule(), index(index), parallelogram() {}
MergeTreeMinMaxGranule::MergeTreeMinMaxGranule(
const MergeTreeMinMaxIndex & index, std::vector<Range> && parallelogram)
: IMergeTreeIndexGranule(), index(index), parallelogram(std::move(parallelogram)) {}
void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const
{
@ -51,7 +53,16 @@ void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr)
}
}
void MergeTreeMinMaxGranule::update(const Block & block, size_t * pos, size_t limit)
MergeTreeMinMaxAggregator::MergeTreeMinMaxAggregator(const MergeTreeMinMaxIndex & index)
: index(index) {}
MergeTreeIndexGranulePtr MergeTreeMinMaxAggregator::getGranuleAndReset()
{
return std::make_shared<MergeTreeMinMaxGranule>(index, std::move(parallelogram));
}
void MergeTreeMinMaxAggregator::update(const Block & block, size_t * pos, size_t limit)
{
if (*pos >= block.rows())
throw Exception(
@ -109,6 +120,13 @@ MergeTreeIndexGranulePtr MergeTreeMinMaxIndex::createIndexGranule() const
return std::make_shared<MergeTreeMinMaxGranule>(*this);
}
MergeTreeIndexAggregatorPtr MergeTreeMinMaxIndex::createIndexAggregator() const
{
return std::make_shared<MergeTreeMinMaxAggregator>(*this);
}
IndexConditionPtr MergeTreeMinMaxIndex::createIndexCondition(
const SelectQueryInfo & query, const Context & context) const
{

View File

@ -16,14 +16,27 @@ class MergeTreeMinMaxIndex;
struct MergeTreeMinMaxGranule : public IMergeTreeIndexGranule
{
explicit MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index);
MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index, std::vector<Range> && parallelogram);
~MergeTreeMinMaxGranule() override = default;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
bool empty() const override { return parallelogram.empty(); }
void update(const Block & block, size_t * pos, size_t limit) override;
~MergeTreeMinMaxGranule() override = default;
const MergeTreeMinMaxIndex & index;
std::vector<Range> parallelogram;
};
struct MergeTreeMinMaxAggregator : IMergeTreeIndexAggregator
{
explicit MergeTreeMinMaxAggregator(const MergeTreeMinMaxIndex & index);
~MergeTreeMinMaxAggregator() override = default;
bool empty() const override { return parallelogram.empty(); }
MergeTreeIndexGranulePtr getGranuleAndReset() override;
void update(const Block & block, size_t * pos, size_t limit) override;
const MergeTreeMinMaxIndex & index;
std::vector<Range> parallelogram;
@ -64,6 +77,7 @@ public:
~MergeTreeMinMaxIndex() override = default;
MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override;
IndexConditionPtr createIndexCondition(
const SelectQueryInfo & query, const Context & context) const override;

View File

@ -132,6 +132,7 @@ void MergeTreeReaderStream::loadMarks()
if (buffer.eof() || buffer.buffer().size() != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
res->protect();
return res;
};

View File

@ -22,10 +22,15 @@ const Field UNKNOWN_FIELD(3u);
MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index)
: IMergeTreeIndexGranule(), index(index), set(new Set(SizeLimits{}, true))
{
set->setHeader(index.header);
}
: IMergeTreeIndexGranule()
, index(index)
, block(index.header.cloneEmpty()) {}
MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(
const MergeTreeSetSkippingIndex & index, MutableColumns && mutable_columns)
: IMergeTreeIndexGranule()
, index(index)
, block(index.header.cloneWithColumns(std::move(mutable_columns))) {}
void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const
{
@ -33,10 +38,9 @@ void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const
throw Exception(
"Attempt to write empty set index `" + index.name + "`", ErrorCodes::LOGICAL_ERROR);
const auto & columns = set->getSetElements();
const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
if (size() > index.max_rows)
if (index.max_rows && size() > index.max_rows)
{
size_type->serializeBinary(0, ostr);
return;
@ -55,21 +59,15 @@ void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const
IDataType::SerializeBinaryBulkStatePtr state;
type->serializeBinaryBulkStatePrefix(settings, state);
type->serializeBinaryBulkWithMultipleStreams(*columns[i], 0, size(), settings, state);
type->serializeBinaryBulkWithMultipleStreams(*block.getByPosition(i).column, 0, size(), settings, state);
type->serializeBinaryBulkStateSuffix(settings, state);
}
}
void MergeTreeSetIndexGranule::deserializeBinary(ReadBuffer & istr)
{
if (!set->empty())
{
auto new_set = std::make_unique<Set>(SizeLimits{}, true);
new_set->setHeader(index.header);
set.swap(new_set);
}
block.clear();
Block block;
Field field_rows;
const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
size_type->deserializeBinary(field_rows, istr);
@ -93,47 +91,124 @@ void MergeTreeSetIndexGranule::deserializeBinary(ReadBuffer & istr)
block.insert(ColumnWithTypeAndName(new_column->getPtr(), type, index.columns[i]));
}
set->insertFromBlock(block);
}
void MergeTreeSetIndexGranule::update(const Block & new_block, size_t * pos, size_t limit)
MergeTreeSetIndexAggregator::MergeTreeSetIndexAggregator(const MergeTreeSetSkippingIndex & index)
: index(index), columns(index.header.cloneEmptyColumns())
{
if (*pos >= new_block.rows())
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(index.columns.size());
Columns materialized_columns;
for (const auto & column : index.header.getColumns())
{
materialized_columns.emplace_back(column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality());
column_ptrs.emplace_back(materialized_columns.back().get());
}
data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
columns = index.header.cloneEmptyColumns();
}
void MergeTreeSetIndexAggregator::update(const Block & block, size_t * pos, size_t limit)
{
if (*pos >= block.rows())
throw Exception(
"The provided position is not less than the number of block rows. Position: "
+ toString(*pos) + ", Block rows: " + toString(new_block.rows()) + ".", ErrorCodes::LOGICAL_ERROR);
+ toString(*pos) + ", Block rows: " + toString(block.rows()) + ".", ErrorCodes::LOGICAL_ERROR);
size_t rows_read = std::min(limit, new_block.rows() - *pos);
size_t rows_read = std::min(limit, block.rows() - *pos);
if (size() > index.max_rows)
if (index.max_rows && size() > index.max_rows)
{
*pos += rows_read;
return;
}
Block key_block;
for (size_t i = 0; i < index.columns.size(); ++i)
ColumnRawPtrs index_column_ptrs;
index_column_ptrs.reserve(index.columns.size());
Columns materialized_columns;
for (const auto & column_name : index.columns)
{
const auto & name = index.columns[i];
const auto & type = index.data_types[i];
key_block.insert(
ColumnWithTypeAndName(
new_block.getByName(name).column->cut(*pos, rows_read),
type,
name));
materialized_columns.emplace_back(
block.getByName(column_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality());
index_column_ptrs.emplace_back(materialized_columns.back().get());
}
set->insertFromBlock(key_block);
IColumn::Filter filter(block.rows(), 0);
bool has_new_data = false;
switch (data.type)
{
case ClearableSetVariants::Type::EMPTY:
break;
#define M(NAME) \
case ClearableSetVariants::Type::NAME: \
has_new_data = buildFilter(*data.NAME, index_column_ptrs, filter, *pos, rows_read, data); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
if (has_new_data)
{
for (size_t i = 0; i < columns.size(); ++i)
{
auto filtered_column = block.getByName(index.columns[i]).column->filter(filter, block.rows());
columns[i]->insertRangeFrom(*filtered_column, 0, filtered_column->size());
}
}
*pos += rows_read;
}
Block MergeTreeSetIndexGranule::getElementsBlock() const
template <typename Method>
bool MergeTreeSetIndexAggregator::buildFilter(
Method & method,
const ColumnRawPtrs & column_ptrs,
IColumn::Filter & filter,
size_t pos,
size_t limit,
ClearableSetVariants & variants) const
{
if (size() > index.max_rows)
return index.header;
return index.header.cloneWithColumns(set->getSetElements());
/// Like DistinctSortedBlockInputStream.
typename Method::State state(column_ptrs, key_sizes, nullptr);
bool has_new_data = false;
for (size_t i = 0; i < limit; ++i)
{
auto emplace_result = state.emplaceKey(method.data, pos + i, variants.string_pool);
if (emplace_result.isInserted())
has_new_data = true;
/// Emit the record if there is no such key in the current set yet.
/// Skip it otherwise.
filter[pos + i] = emplace_result.isInserted();
}
return has_new_data;
}
MergeTreeIndexGranulePtr MergeTreeSetIndexAggregator::getGranuleAndReset()
{
auto granule = std::make_shared<MergeTreeSetIndexGranule>(index, std::move(columns));
switch (data.type)
{
case ClearableSetVariants::Type::EMPTY:
break;
#define M(NAME) \
case ClearableSetVariants::Type::NAME: \
data.NAME->data.clear(); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
columns = index.header.cloneEmptyColumns();
return granule;
}
@ -190,10 +265,10 @@ bool SetIndexCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule)
throw Exception(
"Set index condition got a granule with the wrong type.", ErrorCodes::LOGICAL_ERROR);
if (useless || !granule->size() || granule->size() > index.max_rows)
if (useless || !granule->size() || (index.max_rows && granule->size() > index.max_rows))
return true;
Block result = granule->getElementsBlock();
Block result = granule->block;
actions->execute(result);
auto column = result.getByName(expression_ast->getColumnName()).column->convertToFullColumnIfLowCardinality();
@ -377,8 +452,13 @@ MergeTreeIndexGranulePtr MergeTreeSetSkippingIndex::createIndexGranule() const
return std::make_shared<MergeTreeSetIndexGranule>(*this);
}
MergeTreeIndexAggregatorPtr MergeTreeSetSkippingIndex::createIndexAggregator() const
{
return std::make_shared<MergeTreeSetIndexAggregator>(*this);
}
IndexConditionPtr MergeTreeSetSkippingIndex::createIndexCondition(
const SelectQueryInfo & query, const Context & context) const
const SelectQueryInfo & query, const Context & context) const
{
return std::make_shared<SetIndexCondition>(query, context, *this);
};

View File

@ -3,7 +3,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Set.h>
#include <Interpreters/SetVariants.h>
#include <memory>
#include <set>
@ -17,20 +17,48 @@ class MergeTreeSetSkippingIndex;
struct MergeTreeSetIndexGranule : public IMergeTreeIndexGranule
{
explicit MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index);
MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index, MutableColumns && columns);
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
size_t size() const { return set->getTotalRowCount(); }
size_t size() const { return block.rows(); }
bool empty() const override { return !size(); }
void update(const Block & block, size_t * pos, size_t limit) override;
Block getElementsBlock() const;
~MergeTreeSetIndexGranule() override = default;
const MergeTreeSetSkippingIndex & index;
std::unique_ptr<Set> set;
Block block;
};
struct MergeTreeSetIndexAggregator : IMergeTreeIndexAggregator
{
explicit MergeTreeSetIndexAggregator(const MergeTreeSetSkippingIndex & index);
~MergeTreeSetIndexAggregator() override = default;
size_t size() const { return data.getTotalRowCount(); }
bool empty() const override { return !size(); }
MergeTreeIndexGranulePtr getGranuleAndReset() override;
void update(const Block & block, size_t * pos, size_t limit) override;
private:
/// return true if has new data
template <typename Method>
bool buildFilter(
Method & method,
const ColumnRawPtrs & column_ptrs,
IColumn::Filter & filter,
size_t pos,
size_t limit,
ClearableSetVariants & variants) const;
const MergeTreeSetSkippingIndex & index;
ClearableSetVariants data;
Sizes key_sizes;
MutableColumns columns;
};
@ -79,6 +107,7 @@ public:
~MergeTreeSetSkippingIndex() override = default;
MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override;
IndexConditionPtr createIndexCondition(
const SelectQueryInfo & query, const Context & context) const override;

View File

@ -330,11 +330,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
for (size_t i = 0; i < storage.skip_indices.size(); ++i)
{
auto & stream = *skip_indices_streams[i];
if (skip_indices_granules[i] && !skip_indices_granules[i]->empty())
{
skip_indices_granules[i]->serializeBinary(stream.compressed);
skip_indices_granules[i].reset();
}
if (!skip_indices_aggregators[i]->empty())
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
}
@ -362,7 +359,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
}
skip_indices_streams.clear();
skip_indices_granules.clear();
skip_indices_aggregators.clear();
skip_index_filling.clear();
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
@ -432,8 +429,7 @@ void MergedBlockOutputStream::init()
part_path + stream_name, MARKS_FILE_EXTENSION,
codec, max_compress_block_size,
0, aio_threshold));
skip_indices_granules.emplace_back(nullptr);
skip_indices_aggregators.push_back(index->createIndexAggregator());
skip_index_filling.push_back(0);
}
}
@ -563,9 +559,9 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
else
{
limit = storage.index_granularity;
if (!skip_indices_granules[i])
if (skip_indices_aggregators[i]->empty())
{
skip_indices_granules[i] = index->createIndexGranule();
skip_indices_aggregators[i] = index->createIndexAggregator();
skip_index_filling[i] = 0;
if (stream.compressed.offset() >= min_compress_block_size)
@ -577,7 +573,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
size_t pos = prev_pos;
skip_indices_granules[i]->update(indices_update_block, &pos, limit);
skip_indices_aggregators[i]->update(indices_update_block, &pos, limit);
if (pos == prev_pos + limit)
{
@ -586,8 +582,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// write index if it is filled
if (skip_index_filling[i] == index->granularity)
{
skip_indices_granules[i]->serializeBinary(stream.compressed);
skip_indices_granules[i].reset();
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_index_filling[i] = 0;
}
}

View File

@ -151,7 +151,7 @@ private:
MutableColumns index_columns;
std::vector<std::unique_ptr<ColumnStream>> skip_indices_streams;
MergeTreeIndexGranules skip_indices_granules;
MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_filling;
};

View File

@ -108,7 +108,7 @@ void ReplicatedMergeTreeAlterThread::run()
LOG_INFO(log, "Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock.");
auto table_lock = storage.lockStructureForAlter(RWLockImpl::NO_QUERY);
auto table_lock = storage.lockExclusively(RWLockImpl::NO_QUERY);
if (columns_in_zk == storage.getColumns() && metadata_diff.empty())
{
@ -134,7 +134,7 @@ void ReplicatedMergeTreeAlterThread::run()
/// Update parts.
if (changed_columns_version || force_recheck_parts)
{
auto table_lock = storage.lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
if (changed_columns_version)
LOG_INFO(log, "ALTER-ing parts");

View File

@ -202,7 +202,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
else if (part->name == part_name)
{
auto zookeeper = storage.getZooKeeper();
auto table_lock = storage.lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->columns, part->checksums);

View File

@ -101,7 +101,8 @@ static void appendGraphitePattern(
{
if (key == "regexp")
{
pattern.regexp = std::make_shared<OptimizedRegularExpression>(config.getString(config_element + ".regexp"));
pattern.regexp_str = config.getString(config_element + ".regexp");
pattern.regexp = std::make_shared<OptimizedRegularExpression>(pattern.regexp_str);
}
else if (key == "function")
{
@ -165,6 +166,7 @@ static void setGraphitePatternsFromConfig(const Context & context,
throw Exception("No '" + config_element + "' element in configuration file",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
params.config_name = config_element;
params.path_column_name = config.getString(config_element + ".path_column_name", "Path");
params.time_column_name = config.getString(config_element + ".time_column_name", "Time");
params.value_column_name = config.getString(config_element + ".value_column_name", "Value");

View File

@ -150,7 +150,7 @@ BlockInputStreams StorageBuffer::read(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
auto destination_lock = destination->lockStructure(false, context.getCurrentQueryId());
auto destination_lock = destination->lockStructureForShare(false, context.getCurrentQueryId());
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
{
@ -677,9 +677,9 @@ void StorageBuffer::flushThread()
}
void StorageBuffer::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
void StorageBuffer::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
auto lock = lockStructureForAlter(context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
/// So that no blocks of the old structure remain.
optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);

View File

@ -81,7 +81,9 @@ public:
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
/// The structure of the subordinate table is not checked and does not change.
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
private:
String name;

View File

@ -336,9 +336,11 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
}
void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & current_table_name, const Context & context)
void StorageDistributed::alter(
const AlterCommands & params, const String & database_name, const String & current_table_name,
const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
auto lock = lockStructureForAlter(context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();

View File

@ -81,7 +81,9 @@ public:
void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & new_table_name) override { table_name = new_table_name; }
/// in the sub-tables, you need to manually add and delete columns
/// the structure of the sub-table is not checked
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void startup() override;
void shutdown() override;

View File

@ -190,7 +190,7 @@ BlockInputStreams StorageMaterializedView::read(
const unsigned num_streams)
{
auto storage = getTargetTable();
auto lock = storage->lockStructure(false, context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
for (auto & stream : streams)
stream->addTableLock(lock);
@ -200,7 +200,7 @@ BlockInputStreams StorageMaterializedView::read(
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context)
{
auto storage = getTargetTable();
auto lock = storage->lockStructure(true, context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(true, context.getCurrentQueryId());
auto stream = storage->write(query, context);
stream->addTableLock(lock);
return stream;

View File

@ -224,7 +224,7 @@ BlockInputStreams StorageMerge::read(
current_streams = std::max(size_t(1), current_streams);
StoragePtr storage = it->first;
TableStructureReadLockPtr struct_lock = it->second;
TableStructureReadLockHolder struct_lock = it->second;
BlockInputStreams source_streams;
@ -262,7 +262,7 @@ BlockInputStreams StorageMerge::read(
BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StoragePtr & storage,
const TableStructureReadLockPtr & struct_lock, Names & real_column_names,
const TableStructureReadLockHolder & struct_lock, Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams)
{
@ -345,7 +345,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
{
auto & table = iterator->table();
if (table.get() != this)
selected_tables.emplace_back(table, table->lockStructure(false, query_id));
selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id));
}
iterator->next();
@ -375,7 +375,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
if (storage.get() != this)
{
virtual_column->insert(storage->getTableName());
selected_tables.emplace_back(storage, get_lock ? storage->lockStructure(false, query_id) : TableStructureReadLockPtr{});
selected_tables.emplace_back(storage, get_lock ? storage->lockStructureForShare(false, query_id) : TableStructureReadLockHolder{});
}
}
@ -395,9 +395,11 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
return selected_tables;
}
void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
void StorageMerge::alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
auto lock = lockStructureForAlter(context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();

View File

@ -44,7 +44,9 @@ public:
/// you need to add and remove columns in the sub-tables manually
/// the structure of sub-tables is not checked
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
@ -54,7 +56,7 @@ private:
OptimizedRegularExpression table_name_regexp;
Context global_context;
using StorageListWithLocks = std::list<std::pair<StoragePtr, TableStructureReadLockPtr>>;
using StorageListWithLocks = std::list<std::pair<StoragePtr, TableStructureReadLockHolder>>;
StorageListWithLocks getSelectedTables(const String & query_id) const;
@ -76,7 +78,7 @@ protected:
BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StoragePtr & storage,
const TableStructureReadLockPtr & struct_lock, Names & real_column_names,
const TableStructureReadLockHolder & struct_lock, Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams = false);

View File

@ -195,11 +195,12 @@ void StorageMergeTree::alter(
const AlterCommands & params,
const String & current_database_name,
const String & current_table_name,
const Context & context)
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{
if (!params.is_mutable())
{
auto table_soft_lock = lockStructureForAlter(context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();
params.apply(new_columns);
@ -211,7 +212,7 @@ void StorageMergeTree::alter(
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto table_soft_lock = lockDataForAlter(context.getCurrentQueryId());
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
data.checkAlter(params, context);
@ -230,7 +231,7 @@ void StorageMergeTree::alter(
transactions.push_back(std::move(transaction));
}
auto table_hard_lock = lockStructureForAlter(context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
{
@ -452,7 +453,7 @@ bool StorageMergeTree::merge(
bool deduplicate,
String * out_disable_reason)
{
auto structure_lock = lockStructure(true, RWLockImpl::NO_QUERY);
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_part;
@ -562,7 +563,7 @@ bool StorageMergeTree::merge(
bool StorageMergeTree::tryMutatePart()
{
auto structure_lock = lockStructure(true, RWLockImpl::NO_QUERY);
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_part;
MutationCommands commands;
@ -774,7 +775,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructure(false, context.getCurrentQueryId());
auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId());
String partition_id = data.getPartitionIDFromQuery(partition, context);
auto parts = data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
@ -879,7 +880,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructure(false, context.getCurrentQueryId());
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
data.freezePartition(command.partition, command.with_name, context);
}
break;
@ -890,7 +891,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructure(false, context.getCurrentQueryId());
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
data.freezeAll(command.with_name, context);
}
break;
@ -908,7 +909,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockForAlter(context.getCurrentQueryId());
auto lock = lockExclusively(context.getCurrentQueryId());
String partition_id = data.getPartitionIDFromQuery(partition, context);
@ -991,8 +992,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
{
auto lock1 = lockStructure(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructure(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
MergeTreeData * src_data = data.checkStructureAndGetMergeTreeData(source_table);

View File

@ -77,7 +77,9 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void checkTableCanBeDropped() const override;

View File

@ -30,9 +30,11 @@ void registerStorageNull(StorageFactory & factory)
});
}
void StorageNull::alter(const AlterCommands & params, const String & current_database_name, const String & current_table_name, const Context & context)
void StorageNull::alter(
const AlterCommands & params, const String & current_database_name, const String & current_table_name,
const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
auto lock = lockStructureForAlter(context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
ColumnsDescription new_columns = getColumns();
IndicesDescription new_indices = getIndicesDescription();

View File

@ -41,7 +41,9 @@ public:
table_name = new_table_name;
}
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override;
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
private:
String table_name;

View File

@ -1089,7 +1089,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
/// Can throw an exception.
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge);
auto table_lock = lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_merged_part(parts);
if (future_merged_part.name != entry.new_part_name)
@ -1219,7 +1219,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Can throw an exception.
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_result);
auto table_lock = lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::MutableDataPartPtr new_part;
MergeTreeData::Transaction transaction(data);
@ -1528,7 +1528,7 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
/// We don't change table structure, only data in some parts
/// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart()
/// If we will lock the whole table here, a deadlock can occur. For example, if use use Buffer table (CLICKHOUSE-3238)
auto lock_read_structure = lockStructure(false, RWLockImpl::NO_QUERY);
auto lock_read_structure = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto zookeeper = getZooKeeper();
@ -1624,7 +1624,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
PartDescriptions parts_to_add;
MergeTreeData::DataPartsVector parts_to_remove;
auto structure_lock_dst_table = lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock_holder_dst_table = lockStructureForShare(false, RWLockImpl::NO_QUERY);
for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
{
@ -1662,7 +1662,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
StoragePtr source_table;
TableStructureReadLockPtr structure_lock_src_table;
TableStructureReadLockHolder table_lock_holder_src_table;
String source_table_name = entry_replace.from_database + "." + entry_replace.from_table;
auto clone_data_parts_from_source_table = [&] () -> size_t
@ -1686,7 +1686,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return 0;
}
structure_lock_src_table = source_table->lockStructure(false, RWLockImpl::NO_QUERY);
table_lock_holder_src_table = source_table->lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed,
MergeTreeDataPartState::Outdated};
@ -2719,9 +2719,9 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
LOG_DEBUG(log, "Fetching part " << part_name << " from " << source_replica_path);
TableStructureReadLockPtr table_lock;
TableStructureReadLockHolder table_lock_holder;
if (!to_detached)
table_lock = lockStructure(true, RWLockImpl::NO_QUERY);
table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
/// Logging
Stopwatch stopwatch;
@ -3087,8 +3087,9 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
}
void StorageReplicatedMergeTree::alter(const AlterCommands & params,
const String & /*database_name*/, const String & /*table_name*/, const Context & query_context)
void StorageReplicatedMergeTree::alter(
const AlterCommands & params, const String & /*database_name*/, const String & /*table_name*/,
const Context & query_context, TableStructureWriteLockHolder & table_lock_holder)
{
assertNotReadonly();
@ -3124,7 +3125,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
{
/// Just to read current structure. Alter will be done in separate thread.
auto table_lock = lockStructure(false, query_context.getCurrentQueryId());
auto table_lock = lockStructureForShare(false, query_context.getCurrentQueryId());
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
@ -3166,6 +3167,8 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
table_lock_holder.release();
/// Wait until all replicas will apply ALTER.
for (const auto & node : changed_nodes)
@ -3382,7 +3385,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructure(false, query_context.getCurrentQueryId());
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
data.freezePartition(command.partition, command.with_name, query_context);
}
break;
@ -3393,7 +3396,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructure(false, query_context.getCurrentQueryId());
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
data.freezeAll(command.with_name, query_context);
}
break;
@ -4448,7 +4451,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
auto table_lock = lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto zookeeper = getZooKeeper();
MergeTreeData::DataPartsVector parts = data.grabOldParts();
@ -4740,8 +4743,8 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace,
const Context & context)
{
auto lock1 = lockStructure(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructure(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
MergeTreeData * src_data = data.checkStructureAndGetMergeTreeData(source_table);

View File

@ -116,7 +116,9 @@ public:
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & query_context) override;
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & query_context, TableStructureWriteLockHolder & table_lock_holder) override;
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override;

View File

@ -100,11 +100,11 @@ protected:
{
StoragePtr storage = storages.at(std::make_pair(database_name, table_name));
TableStructureReadLockPtr table_lock;
TableStructureReadLockHolder table_lock;
try
{
table_lock = storage->lockStructure(false, query_id);
table_lock = storage->lockStructureForShare(false, query_id);
}
catch (const Exception & e)
{

View File

@ -1,175 +1,139 @@
#include <Storages/System/StorageSystemGraphite.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/Field.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Poco/Util/Application.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
}
namespace
{
using namespace Poco::Util;
struct Pattern
{
struct Retention
{
UInt64 age;
UInt64 precision;
};
std::string regexp;
std::string function;
std::vector<Retention> retentions;
UInt16 priority;
UInt8 is_default;
};
static Pattern readOnePattern(
const AbstractConfiguration & config,
const std::string & path)
{
Pattern pattern;
AbstractConfiguration::Keys keys;
config.keys(path, keys);
if (keys.empty())
throw Exception("Empty pattern in Graphite rollup configuration", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
for (const auto & key : keys)
{
const String key_path = path + "." + key;
if (startsWith(key, "regexp"))
{
pattern.regexp = config.getString(key_path);
}
else if (startsWith(key, "function"))
{
pattern.function = config.getString(key_path);
}
else if (startsWith(key, "retention"))
{
pattern.retentions.push_back(Pattern::Retention{0, 0});
pattern.retentions.back().age = config.getUInt64(key_path + ".age", 0);
pattern.retentions.back().precision = config.getUInt64(key_path + ".precision", 0);
}
}
return pattern;
}
static std::vector<Pattern> readPatterns(
const AbstractConfiguration & config,
const std::string & section)
{
AbstractConfiguration::Keys keys;
std::vector<Pattern> result;
size_t count = 0;
config.keys(section, keys);
for (const auto & key : keys)
{
if (startsWith(key, "pattern"))
{
Pattern pattern(readOnePattern(config, section + "." + key));
pattern.is_default = false;
pattern.priority = ++count;
result.push_back(pattern);
}
else if (startsWith(key, "default"))
{
Pattern pattern(readOnePattern(config, section + "." + key));
pattern.is_default = true;
pattern.priority = std::numeric_limits<UInt16>::max();
result.push_back(pattern);
}
}
return result;
}
static Strings getAllGraphiteSections(const AbstractConfiguration & config)
{
Strings result;
AbstractConfiguration::Keys keys;
config.keys(keys);
for (const auto & key : keys)
{
if (startsWith(key, "graphite_"))
result.push_back(key);
}
return result;
}
} // namespace
NamesAndTypesList StorageSystemGraphite::getNamesAndTypes()
{
return {
{"config_name", std::make_shared<DataTypeString>()},
{"regexp", std::make_shared<DataTypeString>()},
{"function", std::make_shared<DataTypeString>()},
{"age", std::make_shared<DataTypeUInt64>()},
{"precision", std::make_shared<DataTypeUInt64>()},
{"priority", std::make_shared<DataTypeUInt16>()},
{"is_default", std::make_shared<DataTypeUInt8>()},
{"config_name", std::make_shared<DataTypeString>()},
{"regexp", std::make_shared<DataTypeString>()},
{"function", std::make_shared<DataTypeString>()},
{"age", std::make_shared<DataTypeUInt64>()},
{"precision", std::make_shared<DataTypeUInt64>()},
{"priority", std::make_shared<DataTypeUInt16>()},
{"is_default", std::make_shared<DataTypeUInt8>()},
{"Tables.database", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"Tables.table", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
};
}
/*
* Looking for (Replicated)*GraphiteMergeTree and get all configuration parameters for them
*/
StorageSystemGraphite::Configs StorageSystemGraphite::getConfigs(const Context & context) const
{
const Databases databases = context.getDatabases();
Configs graphite_configs;
for (const auto & db : databases)
{
for (auto iterator = db.second->getIterator(context); iterator->isValid(); iterator->next())
{
auto & table = iterator->table();
const MergeTreeData * table_data = nullptr;
if (const StorageMergeTree * merge_tree = dynamic_cast<StorageMergeTree *>(table.get()))
{
table_data = &merge_tree->getData();
}
else if (const StorageReplicatedMergeTree * replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
table_data = &replicated_merge_tree->getData();
}
else
{
continue;
}
if (table_data->merging_params.mode == MergeTreeData::MergingParams::Graphite)
{
const String & config_name = table_data->merging_params.graphite_params.config_name;
if (!graphite_configs.count(config_name))
{
Config new_config =
{
table_data->merging_params.graphite_params,
{ table_data->getDatabaseName() },
{ table_data->getTableName() },
};
graphite_configs.emplace(config_name, new_config);
}
else
{
graphite_configs[config_name].databases.emplace_back(table_data->getDatabaseName());
graphite_configs[config_name].tables.emplace_back(table_data->getTableName());
}
}
}
}
return graphite_configs;
}
void StorageSystemGraphite::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
const auto & config = context.getConfigRef();
Configs graphite_configs = StorageSystemGraphite::getConfigs(context);
Strings sections = getAllGraphiteSections(config);
for (const auto & section : sections)
for (const auto & config : graphite_configs)
{
const auto patterns = readPatterns(config, section);
for (const auto & pattern : patterns)
UInt16 priority = 0;
for (const auto & pattern : config.second.graphite_params.patterns)
{
bool is_default = pattern.regexp == nullptr;
String regexp;
String function;
if (is_default)
{
priority = std::numeric_limits<UInt16>::max();
}
else
{
priority++;
regexp = pattern.regexp_str;
}
if (pattern.function)
{
function = pattern.function->getName();
}
if (!pattern.retentions.empty())
{
for (const auto & ret : pattern.retentions)
for (const auto & retention : pattern.retentions)
{
res_columns[0]->insert(section);
res_columns[1]->insert(pattern.regexp);
res_columns[2]->insert(pattern.function);
res_columns[3]->insert(ret.age);
res_columns[4]->insert(ret.precision);
res_columns[5]->insert(pattern.priority);
res_columns[6]->insert(pattern.is_default);
size_t i = 0;
res_columns[i++]->insert(config.first);
res_columns[i++]->insert(regexp);
res_columns[i++]->insert(function);
res_columns[i++]->insert(retention.age);
res_columns[i++]->insert(retention.precision);
res_columns[i++]->insert(priority);
res_columns[i++]->insert(is_default);
res_columns[i++]->insert(config.second.databases);
res_columns[i++]->insert(config.second.tables);
}
}
else
{
res_columns[0]->insert(section);
res_columns[1]->insert(pattern.regexp);
res_columns[2]->insert(pattern.function);
res_columns[3]->insert(0);
res_columns[4]->insert(0);
res_columns[5]->insert(pattern.priority);
res_columns[6]->insert(pattern.is_default);
size_t i = 0;
res_columns[i++]->insert(config.first);
res_columns[i++]->insert(regexp);
res_columns[i++]->insert(function);
res_columns[i++]->insert(NULL);
res_columns[i++]->insert(NULL);
res_columns[i++]->insert(priority);
res_columns[i++]->insert(is_default);
res_columns[i++]->insert(config.second.databases);
res_columns[i++]->insert(config.second.tables);
}
}
}

View File

@ -1,7 +1,10 @@
#pragma once
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <ext/shared_ptr_helper.h>
namespace DB
@ -15,10 +18,21 @@ public:
static NamesAndTypesList getNamesAndTypes();
struct Config
{
Graphite::Params graphite_params;
Array databases;
Array tables;
};
using Configs = std::map<const String, Config>;
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
StorageSystemGraphite::Configs getConfigs(const Context & context) const;
};
}

View File

@ -49,7 +49,6 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multi
setColumns(ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}}));
}
BlockInputStreams StorageSystemNumbers::read(
const Names & column_names,
const SelectQueryInfo &,
@ -75,7 +74,7 @@ BlockInputStreams StorageSystemNumbers::read(
res[i] = std::make_shared<NumbersBlockInputStream>(max_block_size, offset + i * max_block_size, num_streams * max_block_size);
if (limit) /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
res[i] = std::make_shared<LimitBlockInputStream>(res[i], *limit * (i + 1) / num_streams - *limit * i / num_streams, 0);
res[i] = std::make_shared<LimitBlockInputStream>(res[i], *limit * (i + 1) / num_streams - *limit * i / num_streams, 0, false, true);
}
return res;

View File

@ -167,7 +167,7 @@ public:
try
{
/// For table not to be dropped and set of columns to remain constant.
info.table_lock = info.storage->lockStructure(false, query_id);
info.table_lock = info.storage->lockStructureForShare(false, query_id);
}
catch (const Exception & e)
{

View File

@ -34,7 +34,7 @@ public:
struct StoragesInfo
{
StoragePtr storage;
TableStructureReadLockPtr table_lock;
TableStructureReadLockHolder table_lock;
String database;
String table;

View File

@ -0,0 +1,37 @@
#pragma once
#include <Common/RWLock.h>
namespace DB
{
/// Structs that hold table structure (columns, their types, default values etc.) locks when executing queries.
/// See IStorage::lock* methods for comments.
struct TableStructureWriteLockHolder
{
void release()
{
*this = TableStructureWriteLockHolder();
}
private:
friend class IStorage;
/// Order is important.
RWLockImpl::LockHolder alter_intention_lock;
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock;
};
struct TableStructureReadLockHolder
{
private:
friend class IStorage;
/// Order is important.
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock;
};
}

View File

@ -231,6 +231,50 @@ SELECT * FROM test.graphite;
assert TSV(result) == TSV(expected)
def test_system_graphite_retentions(graphite_table):
expected = '''
graphite_rollup \\\\.count$ sum 0 0 1 0 ['test'] ['graphite']
graphite_rollup \\\\.max$ max 0 0 2 0 ['test'] ['graphite']
graphite_rollup ^five_min\\\\. 31536000 14400 3 0 ['test'] ['graphite']
graphite_rollup ^five_min\\\\. 5184000 3600 3 0 ['test'] ['graphite']
graphite_rollup ^five_min\\\\. 0 300 3 0 ['test'] ['graphite']
graphite_rollup ^one_min avg 31536000 600 4 0 ['test'] ['graphite']
graphite_rollup ^one_min avg 7776000 300 4 0 ['test'] ['graphite']
graphite_rollup ^one_min avg 0 60 4 0 ['test'] ['graphite']
'''
result = q('SELECT * from system.graphite_retentions')
assert TSV(result) == TSV(expected)
q('''
DROP TABLE IF EXISTS test.graphite2;
CREATE TABLE test.graphite2
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=8192;
''')
expected = '''
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
'''
result = q('''
SELECT
config_name,
Tables.database,
Tables.table
FROM system.graphite_retentions
''')
assert TSV(result) == TSV(expected)
def test_path_dangling_pointer(graphite_table):
q('''
DROP TABLE IF EXISTS test.graphite2;

Some files were not shown because too many files have changed in this diff Show More