Merge branch 'master' of https://github.com/yandex/ClickHouse into kafka_zstd

This commit is contained in:
Odin Hultgren Van Der Horst 2019-01-23 11:33:34 +01:00
commit 3b01eb62f2
267 changed files with 6103 additions and 4419 deletions

11
.gitignore vendored
View File

@ -180,18 +180,10 @@ utils/zookeeper-create-entry-to-download-part/zookeeper-create-entry-to-download
utils/zookeeper-dump-tree/zookeeper-dump-tree
utils/zookeeper-remove-by-list/zookeeper-remove-by-list
dbms/src/Storages/tests/remove_symlink_directory
debian/control
debian/copyright
debian/tmp/
libs/libcommon/src/tests/json_test
utils/compressor/zstd_test
utils/wikistat-loader/wikistat-loader
dbms/src/Common/tests/pod_array
debian/clickhouse-benchmark/
debian/clickhouse-client/
debian/clickhouse-server-base/
debian/clickhouse-server-common/
debian/files
dbms/src/Server/data/*
dbms/src/Server/metadata/*
@ -210,9 +202,6 @@ vgcore*
*.changes
build-stamp
configure-stamp
debian/*.debhelper.log
debian/*.debhelper
debian/*.substvars
*.bin
*.mrk

View File

@ -1,50 +0,0 @@
language: generic
matrix:
fast_finish: true
include:
# We need to have gcc7 headers to compile c++17 code on clang
# - os: linux
#
# cache:
# ccache: true
# timeout: 1000
# directories:
# - /home/travis/.ccache
#
# addons:
# apt:
# update: true
# sources:
# - ubuntu-toolchain-r-test
# - llvm-toolchain-trusty-5.0
# packages: [ ninja-build, g++-7, clang-5.0, lld-5.0, libicu-dev, libreadline-dev, libmysqlclient-dev, unixodbc-dev, libltdl-dev, libssl-dev, libboost-dev, zlib1g-dev, libdouble-conversion-dev, libsparsehash-dev, librdkafka-dev, libcapnp-dev, libsparsehash-dev, libgoogle-perftools-dev, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo, openssl]
#
# env:
# - MATRIX_EVAL="export CC=clang-5.0 CXX=clang++-5.0"
#
# script:
# - utils/travis/normal.sh
- os: linux
sudo: required
cache:
timeout: 1000
directories:
- /var/cache/pbuilder/ccache
addons:
apt:
update: true
packages: [ pbuilder, fakeroot, debhelper ]
script:
- utils/travis/pbuilder.sh
allow_failures:
- os: osx
before_script:
- eval "${MATRIX_EVAL}"

View File

@ -81,9 +81,13 @@ option (ENABLE_TESTS "Enables tests" ON)
if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
option (USE_INTERNAL_MEMCPY "Use internal implementation of 'memcpy' function instead of provided by libc. Only for x86_64." ON)
if (OS_LINUX)
if (OS_LINUX AND NOT UNBUNDLED)
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Only for x86_64, Linux. Implies USE_INTERNAL_MEMCPY." ON)
endif()
if (GLIBC_COMPATIBILITY)
message (STATUS "Some symbols from glibc will be replaced for compatibility")
link_libraries(glibc-compatibility)
endif ()
endif ()
endif ()
if (GLIBC_COMPATIBILITY)
@ -179,8 +183,8 @@ endif (TEST_COVERAGE)
if (ENABLE_TESTS)
message (STATUS "Tests are enabled")
enable_testing()
endif ()
enable_testing() # Enable for tests without binary
# when installing to /usr - place configs to /etc but for /usr/local place to /usr/local/etc
if (CMAKE_INSTALL_PREFIX STREQUAL "/usr")
@ -251,7 +255,7 @@ include (libs/libdaemon/cmake/find_unwind.cmake)
include (cmake/print_flags.cmake)
add_subdirectory (contrib)
add_subdirectory (contrib EXCLUDE_FROM_ALL)
add_subdirectory (libs)
add_subdirectory (utils)
add_subdirectory (dbms)

View File

@ -10,3 +10,7 @@ ClickHouse is an open-source column-oriented database management system that all
* [Blog](https://clickhouse.yandex/blog/en/) contains various ClickHouse-related articles, as well as announces and reports about events.
* [Contacts](https://clickhouse.yandex/#contacts) can help to get your questions answered if there are any.
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [C++ ClickHouse and CatBoost Sprints](https://events.yandex.ru/events/ClickHouse/2-feb-2019/) in Moscow on February 2.

View File

@ -1,4 +1,4 @@
if (NOT ARCH_ARM AND NOT ARCH_32)
if (NOT ARCH_ARM AND NOT ARCH_32 AND NOT APPLE)
option (ENABLE_RDKAFKA "Enable kafka" ON)
endif ()

View File

@ -54,6 +54,8 @@ if (USE_INTERNAL_UNWIND_LIBRARY)
endif ()
if (USE_INTERNAL_ZLIB_LIBRARY)
set (ZLIB_ENABLE_TESTS 0 CACHE INTERNAL "")
set (SKIP_INSTALL_ALL 1 CACHE INTERNAL "")
set (ZLIB_COMPAT 1 CACHE INTERNAL "") # also enables WITH_GZFILEOP
set (WITH_NATIVE_INSTRUCTIONS ${ARCH_NATIVE} CACHE INTERNAL "")
if (OS_FREEBSD OR ARCH_I386)
@ -74,15 +76,15 @@ if (USE_INTERNAL_ZLIB_LIBRARY)
target_compile_definitions (zlibstatic PUBLIC X86_64 UNALIGNED_OK)
endif ()
set_target_properties(example PROPERTIES EXCLUDE_FROM_ALL 1)
if (TARGET example64)
set_target_properties(example64 PROPERTIES EXCLUDE_FROM_ALL 1)
endif ()
#set_target_properties(example PROPERTIES EXCLUDE_FROM_ALL 1)
#if (TARGET example64)
# set_target_properties(example64 PROPERTIES EXCLUDE_FROM_ALL 1)
#endif ()
set_target_properties(minigzip PROPERTIES EXCLUDE_FROM_ALL 1)
if (TARGET minigzip64)
set_target_properties(minigzip64 PROPERTIES EXCLUDE_FROM_ALL 1)
endif ()
#set_target_properties(minigzip PROPERTIES EXCLUDE_FROM_ALL 1)
#if (TARGET minigzip64)
# set_target_properties(minigzip64 PROPERTIES EXCLUDE_FROM_ALL 1)
#endif ()
endif ()
if (USE_INTERNAL_CCTZ_LIBRARY)
@ -105,8 +107,7 @@ if (USE_INTERNAL_SSL_LIBRARY)
if (NOT MAKE_STATIC_LIBRARIES)
set (BUILD_SHARED 1)
endif ()
set (USE_SHARED ${USE_STATIC_LIBRARIES})
set (LIBRESSL_SKIP_INSTALL 1)
set (LIBRESSL_SKIP_INSTALL 1 CACHE INTERNAL "")
add_subdirectory (ssl)
target_include_directories(${OPENSSL_CRYPTO_LIBRARY} SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR})
target_include_directories(${OPENSSL_SSL_LIBRARY} SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR})

2
contrib/ssl vendored

@ -1 +1 @@
Subproject commit dbbbcdbbd17785566f8f9c107b714f9e213d7293
Subproject commit ba8de796195ff9d8bb0249ce289b83226b848b77

View File

@ -299,8 +299,8 @@ target_include_directories (dbms SYSTEM BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR})
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
if (USE_HDFS)
target_link_libraries (dbms PRIVATE ${HDFS3_LIBRARY})
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR})
target_link_libraries (clickhouse_common_io PRIVATE ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR})
endif()
if (USE_JEMALLOC)
@ -318,11 +318,6 @@ target_include_directories (clickhouse_common_io BEFORE PRIVATE ${COMMON_INCLUDE
add_subdirectory (programs)
add_subdirectory (tests)
if (GLIBC_COMPATIBILITY AND NOT CLICKHOUSE_SPLIT_BINARY)
MESSAGE(STATUS "Some symbols from glibc will be replaced for compatibility")
target_link_libraries(dbms PUBLIC glibc-compatibility)
endif()
if (ENABLE_TESTS)
macro (grep_gtest_sources BASE_DIR DST_VAR)
# Cold match files that are not in tests/ directories

View File

@ -2,10 +2,10 @@
set(VERSION_REVISION 54413)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 1)
set(VERSION_PATCH 1)
set(VERSION_GITHASH 4e7747117123f5a1b027a64865844b4faa10447d)
set(VERSION_DESCRIBE v19.1.1-testing)
set(VERSION_STRING 19.1.1)
set(VERSION_PATCH 5)
set(VERSION_GITHASH 2a7e7364c139b3c97f54f38ca6ea76ab4fa61e4b)
set(VERSION_DESCRIBE v19.1.5-testing)
set(VERSION_STRING 19.1.5)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -27,7 +27,7 @@ elseif (EXISTS ${INTERNAL_COMPILER_BIN_ROOT}${INTERNAL_COMPILER_EXECUTABLE})
endif ()
if (COPY_HEADERS_COMPILER AND OS_LINUX)
add_custom_target (copy-headers env CLANG=${COPY_HEADERS_COMPILER} BUILD_PATH=${ClickHouse_BINARY_DIR} DESTDIR=${ClickHouse_SOURCE_DIR} ${ClickHouse_SOURCE_DIR}/copy_headers.sh ${ClickHouse_SOURCE_DIR} ${TMP_HEADERS_DIR} DEPENDS ${COPY_HEADERS_DEPENDS} WORKING_DIRECTORY ${ClickHouse_SOURCE_DIR} SOURCES ${ClickHouse_SOURCE_DIR}/copy_headers.sh)
add_custom_target (copy-headers [ -f ${TMP_HEADERS_DIR}/dbms/src/Interpreters/SpecializedAggregator.h ] || env CLANG=${COPY_HEADERS_COMPILER} BUILD_PATH=${ClickHouse_BINARY_DIR} DESTDIR=${ClickHouse_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/copy_headers.sh ${ClickHouse_SOURCE_DIR} ${TMP_HEADERS_DIR} DEPENDS ${COPY_HEADERS_DEPENDS} WORKING_DIRECTORY ${ClickHouse_SOURCE_DIR} SOURCES copy_headers.sh)
if (USE_INTERNAL_LLVM_LIBRARY)
set (CLANG_HEADERS_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm/clang/lib/Headers")

View File

@ -19,7 +19,7 @@ set -e
#
# sudo ./copy_headers.sh . /usr/share/clickhouse/headers/
SOURCE_PATH=${1:-.}
SOURCE_PATH=${1:-../../..}
DST=${2:-$SOURCE_PATH/../headers}
BUILD_PATH=${BUILD_PATH=${3:-$SOURCE_PATH/build}}

View File

@ -860,7 +860,7 @@ private:
}
/// Process the query that doesn't require transfering data blocks to the server.
/// Process the query that doesn't require transferring data blocks to the server.
void processOrdinaryQuery()
{
connection->sendQuery(query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
@ -869,7 +869,7 @@ private:
}
/// Process the query that requires transfering data blocks to the server.
/// Process the query that requires transferring data blocks to the server.
void processInsertQuery()
{
/// Send part of query without data, because data will be sent separately.
@ -1136,7 +1136,7 @@ private:
}
/// Process Log packets, exit when recieve Exception or EndOfStream
/// Process Log packets, exit when receive Exception or EndOfStream
bool receiveEndOfQuery()
{
while (true)

View File

@ -25,12 +25,12 @@ namespace ErrorCodes
struct ConnectionParameters
{
String host;
UInt16 port;
UInt16 port{};
String default_database;
String user;
String password;
Protocol::Secure security;
Protocol::Compression compression;
Protocol::Secure security = Protocol::Secure::Disable;
Protocol::Compression compression = Protocol::Compression::Enable;
ConnectionTimeouts timeouts;
ConnectionParameters() {}

View File

@ -93,11 +93,12 @@ private:
{
std::stringstream ss;
ss << hint;
String item;
while (!ss.eof())
{
String item;
ss >> item;
if (item.empty())
if (ss.eof())
break;
if (item == "serverError")

View File

@ -137,7 +137,7 @@ try
static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);
/// Don't initilaize DateLUT
/// Don't initialize DateLUT
registerFunctions();
registerAggregateFunctions();

View File

@ -51,7 +51,7 @@ It is designed to retain the following properties of data:
- probability distributions of length of strings;
- probability of zero values of numbers; empty strings and arrays, NULLs;
- data compression ratio when compressed with LZ77 and entropy family of codecs;
- continuouty (magnitude of difference) of time values across table; continuouty of floating point values.
- continuity (magnitude of difference) of time values across table; continuity of floating point values.
- date component of DateTime values;
- UTF-8 validity of string values;
- string values continue to look somewhat natural.
@ -246,7 +246,7 @@ Float transformFloatMantissa(Float x, UInt64 seed)
/// Transform difference from previous number by applying pseudorandom permutation to mantissa part of it.
/// It allows to retain some continuouty property of source data.
/// It allows to retain some continuity property of source data.
template <typename Float>
class FloatModel : public IModel
{

View File

@ -22,7 +22,7 @@ std::string validateODBCConnectionString(const std::string & connection_string)
/// Connection string is a list of name, value pairs.
/// name and value are separated by '='.
/// names are case insensitive.
/// name=value pairs are sepated by ';'.
/// name=value pairs are separated by ';'.
/// ASCII whitespace characters are skipped before and after delimiters.
/// value may be optionally enclosed by {}
/// in enclosed value, } is escaped as }}.

View File

@ -25,7 +25,10 @@ endif ()
if (OS_LINUX AND MAKE_STATIC_LIBRARIES)
set (GLIBC_MAX_REQUIRED 2.4 CACHE INTERNAL "")
add_test(NAME GLIBC_required_version COMMAND bash -c "readelf -s ${CMAKE_CURRENT_BINARY_DIR}/../clickhouse-server | grep '@GLIBC' | grep -oP 'GLIBC_[\\d\\.]+' | sort | uniq | sort -r | perl -lnE 'exit 1 if $_ gt q{GLIBC_${GLIBC_MAX_REQUIRED}}'")
# temporary disabled. to enable - change 'exit 0' to 'exit $a'
add_test(NAME GLIBC_required_version COMMAND bash -c "readelf -s ${CMAKE_CURRENT_BINARY_DIR}/../clickhouse-server | perl -nE 'END {exit 0 if $a} ++$a, print if /\\x40GLIBC_(\\S+)/ and pack(q{C*}, split /\\./, \$1) gt pack q{C*}, split /\\./, q{${GLIBC_MAX_REQUIRED}}'")
#add_test(NAME GLIBC_required_version COMMAND bash -c "readelf -s ${CMAKE_CURRENT_BINARY_DIR}/../clickhouse-server | grep '@GLIBC' | grep -oP 'GLIBC_[\\d\\.]+' | sort | uniq | sort --version-sort --reverse | perl -lnE 'warn($_), exit 1 if $_ gt q{GLIBC_${GLIBC_MAX_REQUIRED}}'") # old
endif ()
install (

View File

@ -565,7 +565,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("http_port"));
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
server_pool,
socket,
@ -582,7 +582,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("https_port"), /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new HTTPHandlerFactory(*this, "HTTPSHandler-factory"),
server_pool,
socket,
@ -602,7 +602,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("tcp_port"));
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(new Poco::Net::TCPServer(
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this),
server_pool,
socket,
@ -619,7 +619,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("tcp_port_secure"), /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(new Poco::Net::TCPServer(
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure= */ true),
server_pool,
socket,
@ -642,7 +642,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("interserver_http_port"));
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
@ -658,7 +658,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("interserver_https_port"), /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,

View File

@ -13,9 +13,9 @@
<!-- How to choose between replicas during distributed query processing.
random - choose random replica from set of replicas with minimum number of errors
nearest_hostname - from set of replicas with minimum number of errors, choose replica
with minumum number of different symbols between replica's hostname and local hostname
with minimum number of different symbols between replica's hostname and local hostname
(Hamming distance).
in_order - first live replica is choosen in specified order.
in_order - first live replica is chosen in specified order.
-->
<load_balancing>random</load_balancing>
</default>

View File

@ -109,7 +109,7 @@ struct AggreagteFunctionGroupUniqArrayGenericData
};
/** Template parameter with true value should be used for columns that store their elements in memory continuously.
* For such columns groupUniqArray() can be implemented more efficently (especially for small numeric arrays).
* For such columns groupUniqArray() can be implemented more efficiently (especially for small numeric arrays).
*/
template <bool is_plain_column = false>
class AggreagteFunctionGroupUniqArrayGeneric

View File

@ -86,7 +86,7 @@ public:
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
events_size = arguments.size();
events_size = static_cast<UInt8>(arguments.size());
}

View File

@ -123,7 +123,7 @@ struct AggregateFunctionTopKGenericData
};
/** Template parameter with true value should be used for columns that store their elements in memory continuously.
* For such columns topK() can be implemented more efficently (especially for small numeric arrays).
* For such columns topK() can be implemented more efficiently (especially for small numeric arrays).
*/
template <bool is_plain_column = false>
class AggregateFunctionTopKGeneric : public IAggregateFunctionDataHelper<AggregateFunctionTopKGenericData, AggregateFunctionTopKGeneric<is_plain_column>>

View File

@ -12,7 +12,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
/** Calculates quantile by counting number of occurences for each value in a hash map.
/** Calculates quantile by counting number of occurrences for each value in a hash map.
*
* It use O(distinct(N)) memory. Can be naturally applied for values with weight.
* In case of many identical values, it can be more efficient than QuantileExact even when weight is not used.

View File

@ -43,7 +43,7 @@ public:
virtual const char * getFamilyName() const = 0;
/** If column isn't constant, returns nullptr (or itself).
* If column is constant, transforms constant to full column (if column type allows such tranform) and return it.
* If column is constant, transforms constant to full column (if column type allows such transform) and return it.
*/
virtual Ptr convertToFullColumnIfConst() const { return getPtr(); }
@ -149,7 +149,7 @@ public:
virtual void insertDefault() = 0;
/** Removes last n elements.
* Is used to support exeption-safety of several operations.
* Is used to support exception-safety of several operations.
* For example, sometimes insertion should be reverted if we catch an exception during operation processing.
* If column has less than n elements or n == 0 - undefined behavior.
*/
@ -234,8 +234,8 @@ public:
virtual void gather(ColumnGathererStream & gatherer_stream) = 0;
/** Computes minimum and maximum element of the column.
* In addition to numeric types, the funtion is completely implemented for Date and DateTime.
* For strings and arrays function should retrurn default value.
* In addition to numeric types, the function is completely implemented for Date and DateTime.
* For strings and arrays function should return default value.
* (except for constant columns; they should return value of the constant).
* If column is empty function should return default value.
*/

View File

@ -64,7 +64,7 @@ namespace DB
* During insertion, each key is locked - to avoid parallel initialization of regions for same key.
*
* On insertion, we search for free region of at least requested size.
* If nothing was found, we evict oldest unused region; if not enogh size, we evict it neighbours; and try again.
* If nothing was found, we evict oldest unused region; if not enough size, we evict it neighbours; and try again.
*
* Metadata is allocated by usual allocator and its memory usage is not accounted.
*

View File

@ -23,7 +23,7 @@ using namespace Poco::XML;
namespace DB
{
/// For cutting prerpocessed path to this base
/// For cutting preprocessed path to this base
static std::string main_config_path;
/// Extracts from a string the first encountered number consisting of at least two digits.

View File

@ -12,7 +12,7 @@
* - in typical implementation of standard library, hash function for integers is trivial and just use lower bits;
* - traffic is non-uniformly distributed across a day;
* - we are using open-addressing linear probing hash tables that are most critical to hash function quality,
* and trivial hash function gives disasterous results.
* and trivial hash function gives disastrous results.
*/
/** Taken from MurmurHash. This is Murmur finalizer.
@ -160,7 +160,7 @@ struct TrivialHash
* NOTE Salting is far from perfect, because it commutes with first steps of calculation.
*
* NOTE As mentioned, this function is slower than intHash64.
* But occasionaly, it is faster, when written in a loop and loop is vectorized.
* But occasionally, it is faster, when written in a loop and loop is vectorized.
*/
template <DB::UInt64 salt>
inline DB::UInt32 intHash32(DB::UInt64 key)

View File

@ -165,7 +165,7 @@ void OptimizedRegularExpressionImpl<thread_safe>::analyze(
++pos;
break;
/// Quantifiers that allow a zero number of occurences.
/// Quantifiers that allow a zero number of occurrences.
case '{':
in_curly_braces = true;
[[fallthrough]];

View File

@ -40,7 +40,7 @@ struct SpaceSavingArena
/*
* Specialized storage for StringRef with a freelist arena.
* Keys of this type that are retained on insertion must be serialised into local storage,
* Keys of this type that are retained on insertion must be serialized into local storage,
* otherwise the reference would be invalid after the processed block is released.
*/
template <>

View File

@ -41,7 +41,7 @@
* - extremely creepy code for implementation of "chroot" feature.
*
* As of 2018, there are no active maintainers of libzookeeper:
* - bugs in JIRA are fixed only occasionaly with ad-hoc patches by library users.
* - bugs in JIRA are fixed only occasionally with ad-hoc patches by library users.
*
* libzookeeper is a classical example of bad code written in C.
*

View File

@ -5,7 +5,7 @@
/// A tool for reproducing https://issues.apache.org/jira/browse/ZOOKEEPER-706
/// Original libzookeeper can't reconnect the session if the length of SET_WATCHES message
/// exceeeds jute.maxbuffer (0xfffff by default).
/// exceeds jute.maxbuffer (0xfffff by default).
/// This happens when the number of watches exceeds ~29000.
///
/// Session reconnect can be caused by forbidding packets to the current zookeeper server, e.g.

View File

@ -43,7 +43,7 @@ CompressedWriteBuffer::CompressedWriteBuffer(
WriteBuffer & out_,
CompressionCodecPtr codec_,
size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(codec_)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_))
{
}

View File

@ -51,7 +51,7 @@ using MergedRowSources = PODArray<RowSourcePart>;
/** Gather single stream from multiple streams according to streams mask.
* Stream mask maps row number to index of source stream.
* Streams should conatin exactly one column.
* Streams should contain exactly one column.
*/
class ColumnGathererStream : public IProfilingBlockInputStream
{

View File

@ -85,7 +85,7 @@ Block FinishSortingBlockInputStream::readImpl()
{
Block block = children.back()->read();
/// End of input stream, but we can`t return immediatly, we need to merge already read blocks.
/// End of input stream, but we can`t return immediately, we need to merge already read blocks.
/// Check it later, when get end of stream from impl.
if (!block)
{
@ -102,7 +102,7 @@ Block FinishSortingBlockInputStream::readImpl()
if (size == 0)
continue;
/// We need to sort each block separatly before merging.
/// We need to sort each block separately before merging.
sortBlock(block, description_to_sort);
removeConstantsFromBlock(block);

View File

@ -17,7 +17,7 @@ namespace DB
/** Intended for implementation of "rollup" - aggregation (rounding) of older data
* for a table with Graphite data (Graphite is the system for time series monitoring).
*
* Table with graphite data has at least the folowing columns (accurate to the name):
* Table with graphite data has at least the following columns (accurate to the name):
* Path, Time, Value, Version
*
* Path - name of metric (sensor);

View File

@ -322,7 +322,7 @@ void registerDataTypeString(DataTypeFactory & factory)
factory.registerSimpleDataType("String", creator);
/// These synonims are added for compatibility.
/// These synonyms are added for compatibility.
factory.registerAlias("CHAR", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("VARCHAR", "String", DataTypeFactory::CaseInsensitive);

View File

@ -20,7 +20,7 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
factory.registerSimpleDataType("Float32", [] { return DataTypePtr(std::make_shared<DataTypeFloat32>()); });
factory.registerSimpleDataType("Float64", [] { return DataTypePtr(std::make_shared<DataTypeFloat64>()); });
/// These synonims are added for compatibility.
/// These synonyms are added for compatibility.
factory.registerAlias("TINYINT", "Int8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("SMALLINT", "Int16", DataTypeFactory::CaseInsensitive);

View File

@ -188,7 +188,7 @@ template <typename A> struct ToInteger
// CLICKHOUSE-29. The same depth, different signs
// NOTE: This case is applied for 64-bit integers only (for backward compability), but could be used for any-bit integers
// NOTE: This case is applied for 64-bit integers only (for backward compatibility), but could be used for any-bit integers
template <typename A, typename B>
constexpr bool LeastGreatestSpecialCase =
std::is_integral_v<A> && std::is_integral_v<B>

View File

@ -81,11 +81,6 @@ CacheDictionary::CacheDictionary(
createAttributes();
}
CacheDictionary::CacheDictionary(const CacheDictionary & other)
: CacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
{
}
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
{

View File

@ -30,8 +30,6 @@ public:
const DictionaryLifetime dict_lifetime,
const size_t size);
CacheDictionary(const CacheDictionary & other);
std::exception_ptr getCreationException() const override { return {}; }
std::string getName() const override { return name; }
@ -53,7 +51,10 @@ public:
bool isCached() const override { return true; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<CacheDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<CacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, size);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -70,10 +70,6 @@ ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(
createAttributes();
}
ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(const ComplexKeyCacheDictionary & other)
: ComplexKeyCacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
{
}
void ComplexKeyCacheDictionary::getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const

View File

@ -47,8 +47,6 @@ public:
const DictionaryLifetime dict_lifetime,
const size_t size);
ComplexKeyCacheDictionary(const ComplexKeyCacheDictionary & other);
std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return {}; }
@ -76,7 +74,10 @@ public:
bool isCached() const override { return true; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<ComplexKeyCacheDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<ComplexKeyCacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, size);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -43,12 +43,6 @@ ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(
creation_time = std::chrono::system_clock::now();
}
ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other)
: ComplexKeyHashedDictionary{
other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
#define DECLARE(TYPE) \
void ComplexKeyHashedDictionary::get##TYPE( \
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ResultArrayType<TYPE> & out) const \

View File

@ -29,8 +29,6 @@ public:
bool require_nonempty,
BlockPtr saved_block = nullptr);
ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other);
std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return creation_exception; }
@ -51,7 +49,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<ComplexKeyHashedDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<ComplexKeyHashedDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -50,12 +50,6 @@ FlatDictionary::FlatDictionary(
creation_time = std::chrono::system_clock::now();
}
FlatDictionary::FlatDictionary(const FlatDictionary & other)
: FlatDictionary{
other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
void FlatDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
{

View File

@ -28,8 +28,6 @@ public:
bool require_nonempty,
BlockPtr saved_block = nullptr);
FlatDictionary(const FlatDictionary & other);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
@ -48,7 +46,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<FlatDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<FlatDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -44,12 +44,6 @@ HashedDictionary::HashedDictionary(
creation_time = std::chrono::system_clock::now();
}
HashedDictionary::HashedDictionary(const HashedDictionary & other)
: HashedDictionary{
other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
void HashedDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
{

View File

@ -27,8 +27,6 @@ public:
bool require_nonempty,
BlockPtr saved_block = nullptr);
HashedDictionary(const HashedDictionary & other);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
@ -47,7 +45,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<HashedDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<HashedDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -3,6 +3,7 @@
#include <vector>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class IDictionarySource;

View File

@ -94,12 +94,6 @@ RangeHashedDictionary::RangeHashedDictionary(
creation_time = std::chrono::system_clock::now();
}
RangeHashedDictionary::RangeHashedDictionary(const RangeHashedDictionary & other)
: RangeHashedDictionary{
other.dictionary_name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
{
}
#define DECLARE_MULTIPLE_GETTER(TYPE) \
void RangeHashedDictionary::get##TYPE( \

View File

@ -24,8 +24,6 @@ public:
const DictionaryLifetime dict_lifetime,
bool require_nonempty);
RangeHashedDictionary(const RangeHashedDictionary & other);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return dictionary_name; }
@ -44,7 +42,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<RangeHashedDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<RangeHashedDictionary>(dictionary_name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -63,11 +63,6 @@ TrieDictionary::TrieDictionary(
creation_time = std::chrono::system_clock::now();
}
TrieDictionary::TrieDictionary(const TrieDictionary & other)
: TrieDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
{
}
TrieDictionary::~TrieDictionary()
{
btrie_destroy(trie);

View File

@ -29,8 +29,6 @@ public:
const DictionaryLifetime dict_lifetime,
bool require_nonempty);
TrieDictionary(const TrieDictionary & other);
~TrieDictionary() override;
std::string getKeyDescription() const { return key_description; }
@ -53,7 +51,10 @@ public:
bool isCached() const override { return false; }
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<TrieDictionary>(*this); }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<TrieDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }

View File

@ -11,7 +11,7 @@
#include <Common/config.h>
/** More efficient implementations of mathematical functions are possible when using a separate library.
* Disabled due to licence compatibility limitations.
* Disabled due to license compatibility limitations.
* To enable: download http://www.agner.org/optimize/vectorclass.zip and unpack to contrib/vectorclass
* Then rebuild with -DENABLE_VECTORCLASS=1
*/

View File

@ -10,7 +10,7 @@
#include <Common/config.h>
/** More efficient implementations of mathematical functions are possible when using a separate library.
* Disabled due to licence compatibility limitations.
* Disabled due to license compatibility limitations.
* To enable: download http://www.agner.org/optimize/vectorclass.zip and unpack to contrib/vectorclass
* Then rebuild with -DENABLE_VECTORCLASS=1
*/

View File

@ -24,7 +24,7 @@ namespace ErrorCodes
*
* Non-cryptographic generators:
*
* rand - linear congruental generator 0 .. 2^32 - 1.
* rand - linear congruential generator 0 .. 2^32 - 1.
* rand64 - combines several rand values to get values from the range 0 .. 2^64 - 1.
*
* randConstant - service function, produces a constant column with a random value.

View File

@ -39,7 +39,7 @@ namespace DB
* replaceRegexpOne(haystack, pattern, replacement) - replaces the pattern with the specified regexp, only the first occurrence.
* replaceRegexpAll(haystack, pattern, replacement) - replaces the pattern with the specified type, all occurrences.
*
* multiPosition(haystack, [pattern_1, pattern_2, ..., pattern_n]) -- find first occurences (positions) of all the const patterns inside haystack
* multiPosition(haystack, [pattern_1, pattern_2, ..., pattern_n]) -- find first occurrences (positions) of all the const patterns inside haystack
* multiPositionUTF8(haystack, [pattern_1, pattern_2, ..., pattern_n])
* multiPositionCaseInsensitive(haystack, [pattern_1, pattern_2, ..., pattern_n])
* multiPositionCaseInsensitiveUTF8(haystack, [pattern_1, pattern_2, ..., pattern_n])

View File

@ -23,7 +23,7 @@ namespace DB
* queryStringAndFragment
*
* Functions, removing parts from URL.
* If URL has nothing like, then it is retured unchanged.
* If URL has nothing like, then it is returned unchanged.
*
* cutWWW
* cutFragment

View File

@ -163,7 +163,7 @@ public:
* Function could be injective with some arguments fixed to some constant values.
* Examples:
* plus(const, x);
* multiply(const, x) where x is an integer and constant is not divisable by two;
* multiply(const, x) where x is an integer and constant is not divisible by two;
* concat(x, 'const');
* concat(x, 'const', y) where const contain at least one non-numeric character;
* concat with FixedString

View File

@ -0,0 +1,68 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <boost/filesystem.hpp>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
struct FilesystemAvailable
{
static constexpr auto name = "filesystemAvailable";
static boost::uintmax_t get(boost::filesystem::space_info & spaceinfo) { return spaceinfo.available; }
};
struct FilesystemFree
{
static constexpr auto name = "filesystemFree";
static boost::uintmax_t get(boost::filesystem::space_info & spaceinfo) { return spaceinfo.free; }
};
struct FilesystemCapacity
{
static constexpr auto name = "filesystemCapacity";
static boost::uintmax_t get(boost::filesystem::space_info & spaceinfo) { return spaceinfo.capacity; }
};
template <typename Impl>
class FilesystemImpl : public IFunction
{
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(const Context & context)
{
return std::make_shared<FilesystemImpl<Impl>>(boost::filesystem::space(context.getConfigRef().getString("path")));
}
explicit FilesystemImpl(boost::filesystem::space_info spaceinfo_) : spaceinfo(spaceinfo_) { }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isDeterministic() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeUInt64>();
}
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override
{
block.getByPosition(result).column = DataTypeUInt64().createColumnConst(input_rows_count, static_cast<UInt64>(Impl::get(spaceinfo)));
}
private:
boost::filesystem::space_info spaceinfo;
};
void registerFunctionFilesystem(FunctionFactory & factory)
{
factory.registerFunction<FilesystemImpl<FilesystemAvailable>>();
factory.registerFunction<FilesystemImpl<FilesystemCapacity>>();
factory.registerFunction<FilesystemImpl<FilesystemFree>>();
}
}

View File

@ -36,7 +36,7 @@ namespace ErrorCodes
*
* It is implemented in two steps.
* At first step, it creates a pattern of zeros, literal characters, whitespaces, etc.
* and quickly fills resulting charater array (string column) with this pattern.
* and quickly fills resulting character array (string column) with this pattern.
* At second step, it walks across the resulting character array and modifies/replaces specific charaters,
* by calling some functions by pointers and shifting cursor by specified amount.
*

View File

@ -904,7 +904,7 @@ public:
using T0 = typename Types::LeftType;
using T1 = typename Types::RightType;
if constexpr ((IsDecimalNumber<T0> && IsDecimalNumber<T1>) || (!IsDecimalNumber<T0> && !IsDecimalNumber<T1>))
if constexpr (IsDecimalNumber<T0> == IsDecimalNumber<T1>)
return executeTyped<T0, T1>(cond_col, block, arguments, result, input_rows_count);
else
throw Exception("Conditional function with Decimal and non Decimal", ErrorCodes::NOT_IMPLEMENTED);

View File

@ -17,7 +17,7 @@ struct MinusImpl
return static_cast<Result>(a) - b;
}
/// Apply operation and check overflow. It's used for Deciamal operations. @returns true if overflowed, false othervise.
/// Apply operation and check overflow. It's used for Deciamal operations. @returns true if overflowed, false otherwise.
template <typename Result = ResultType>
static inline bool apply(A a, B b, Result & c)
{

View File

@ -17,7 +17,7 @@ struct MultiplyImpl
return static_cast<Result>(a) * b;
}
/// Apply operation and check overflow. It's used for Deciamal operations. @returns true if overflowed, false othervise.
/// Apply operation and check overflow. It's used for Deciamal operations. @returns true if overflowed, false otherwise.
template <typename Result = ResultType>
static inline bool apply(A a, B b, Result & c)
{

View File

@ -18,7 +18,7 @@ struct PlusImpl
return static_cast<Result>(a) + b;
}
/// Apply operation and check overflow. It's used for Deciamal operations. @returns true if overflowed, false othervise.
/// Apply operation and check overflow. It's used for Deciamal operations. @returns true if overflowed, false otherwise.
template <typename Result = ResultType>
static inline bool apply(A a, B b, Result & c)
{

View File

@ -41,6 +41,7 @@ void registerFunctionLowCardinalityIndices(FunctionFactory &);
void registerFunctionLowCardinalityKeys(FunctionFactory &);
void registerFunctionsIn(FunctionFactory &);
void registerFunctionJoinGet(FunctionFactory &);
void registerFunctionFilesystem(FunctionFactory &);
void registerFunctionsMiscellaneous(FunctionFactory & factory)
{
@ -82,6 +83,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionLowCardinalityKeys(factory);
registerFunctionsIn(factory);
registerFunctionJoinGet(factory);
registerFunctionFilesystem(factory);
}
}

View File

@ -0,0 +1,43 @@
#include <IO/HDFSCommon.h>
#if USE_HDFS
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
}
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & uri)
{
auto & host = uri.getHost();
auto port = uri.getPort();
auto & path = uri.getPath();
if (host.empty() || port == 0 || path.empty())
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
HDFSBuilderPtr builder(hdfsNewBuilder());
if (builder == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
hdfsBuilderSetNameNode(builder.get(), host.c_str());
hdfsBuilderSetNameNodePort(builder.get(), port);
return builder;
}
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{
HDFSFSPtr fs(hdfsBuilderConnect(builder));
if (fs == nullptr)
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return fs;
}
}
#endif

38
dbms/src/IO/HDFSCommon.h Normal file
View File

@ -0,0 +1,38 @@
#include <Common/config.h>
#include <memory>
#include <type_traits>
#include <Poco/URI.h>
#if USE_HDFS
#include <hdfs/hdfs.h>
namespace DB
{
namespace detail
{
struct HDFSBuilderDeleter
{
void operator()(hdfsBuilder * builder_ptr)
{
hdfsFreeBuilder(builder_ptr);
}
};
struct HDFSFsDeleter
{
void operator()(hdfsFS fs_ptr)
{
hdfsDisconnect(fs_ptr);
}
};
}
using HDFSBuilderPtr = std::unique_ptr<hdfsBuilder, detail::HDFSBuilderDeleter>;
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & hdfs_uri);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -0,0 +1,76 @@
#include <IO/ReadBufferFromHDFS.h>
#if USE_HDFS
#include <Poco/URI.h>
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int CANNOT_OPEN_FILE;
}
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
{
Poco::URI hdfs_uri;
hdfsFile fin;
HDFSBuilderPtr builder;
HDFSFSPtr fs;
ReadBufferFromHDFSImpl(const std::string & hdfs_name_)
: hdfs_uri(hdfs_name_)
, builder(createHDFSBuilder(hdfs_uri))
, fs(createHDFSFS(builder.get()))
{
auto & path = hdfs_uri.getPath();
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
if (fin == nullptr)
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
ErrorCodes::CANNOT_OPEN_FILE);
}
int read(char * start, size_t size)
{
int bytes_read = hdfsRead(fs.get(), fin, start, size);
if (bytes_read < 0)
throw Exception("Fail to read HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return bytes_read;
}
~ReadBufferFromHDFSImpl()
{
hdfsCloseFile(fs.get(), fin);
}
};
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_))
{
}
bool ReadBufferFromHDFS::nextImpl()
{
int bytes_read = impl->read(internal_buffer.begin(), internal_buffer.size());
if (bytes_read)
working_buffer.resize(bytes_read);
else
return false;
return true;
}
ReadBufferFromHDFS::~ReadBufferFromHDFS()
{
}
}
#endif

View File

@ -4,94 +4,26 @@
#if USE_HDFS
#include <IO/ReadBuffer.h>
#include <Poco/URI.h>
#include <hdfs/hdfs.h>
#include <IO/BufferWithOwnMemory.h>
#include <string>
#ifndef O_DIRECT
#define O_DIRECT 00040000
#endif
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
}
/** Accepts path to file and opens it, or pre-opened file descriptor.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
{
protected:
std::string hdfs_uri;
struct hdfsBuilder *builder;
hdfsFS fs;
hdfsFile fin;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<ReadBuffer>(buf_size), hdfs_uri(hdfs_name_) , builder(hdfsNewBuilder())
{
Poco::URI uri(hdfs_name_);
auto & host = uri.getHost();
auto port = uri.getPort();
auto & path = uri.getPath();
if (host.empty() || port == 0 || path.empty())
{
throw Exception("Illegal HDFS URI: " + hdfs_uri, ErrorCodes::BAD_ARGUMENTS);
}
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
hdfsBuilderConfSetStr(builder, "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder, "input.connect.timeout", "60000"); // 1 min
/** Accepts HDFS path to file and opens it.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
{
struct ReadBufferFromHDFSImpl;
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
hdfsBuilderSetNameNode(builder, host.c_str());
hdfsBuilderSetNameNodePort(builder, port);
fs = hdfsBuilderConnect(builder);
bool nextImpl() override;
if (fs == nullptr)
{
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()), ErrorCodes::NETWORK_ERROR);
}
fin = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
}
ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
~ReadBufferFromHDFS() override
{
close();
hdfsFreeBuilder(builder);
}
/// Close HDFS connection before destruction of object.
void close()
{
hdfsCloseFile(fs, fin);
}
bool nextImpl() override
{
int bytes_read = hdfsRead(fs, fin, internal_buffer.begin(), internal_buffer.size());
if (bytes_read < 0)
{
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()), ErrorCodes::NETWORK_ERROR);
}
if (bytes_read)
working_buffer.resize(bytes_read);
else
return false;
return true;
}
const std::string & getHDFSUri() const
{
return hdfs_uri;
}
};
~ReadBufferFromHDFS() override;
};
}
#endif

View File

@ -991,7 +991,7 @@ void skipToUnescapedNextLineOrEOF(ReadBuffer & buf)
if (buf.eof())
return;
/// Skip escaped character. We do not consider escape sequences with more than one charater after backslash (\x01).
/// Skip escaped character. We do not consider escape sequences with more than one character after backslash (\x01).
/// It's ok for the purpose of this function, because we are interested only in \n and \\.
++buf.position();
continue;

View File

@ -581,7 +581,7 @@ inline ReturnType readDateTimeTextImpl(time_t & datetime, ReadBuffer & buf, cons
{
/** Read 10 characters, that could represent unix timestamp.
* Only unix timestamp of 5-10 characters is supported.
* Then look at 5th charater. If it is a number - treat whole as unix timestamp.
* Then look at 5th character. If it is a number - treat whole as unix timestamp.
* If it is not a number - then parse datetime in YYYY-MM-DD hh:mm:ss format.
*/

View File

@ -0,0 +1,95 @@
#include <IO/WriteBufferFromHDFS.h>
#if USE_HDFS
#include <Poco/URI.h>
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_FSYNC;
}
struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
Poco::URI hdfs_uri;
hdfsFile fout;
HDFSBuilderPtr builder;
HDFSFSPtr fs;
WriteBufferFromHDFSImpl(const std::string & hdfs_name_)
: hdfs_uri(hdfs_name_)
, builder(createHDFSBuilder(hdfs_uri))
, fs(createHDFSFS(builder.get()))
{
auto & path = hdfs_uri.getPath();
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0);
if (fout == nullptr)
{
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
ErrorCodes::CANNOT_OPEN_FILE);
}
}
~WriteBufferFromHDFSImpl()
{
hdfsCloseFile(fs.get(), fout);
}
int write(const char * start, size_t size)
{
int bytes_written = hdfsWrite(fs.get(), fout, start, size);
if (bytes_written < 0)
throw Exception("Fail to write HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return bytes_written;
}
void sync()
{
int result = hdfsSync(fs.get(), fout);
if (result < 0)
throwFromErrno("Cannot HDFS sync" + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
ErrorCodes::CANNOT_FSYNC);
}
};
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_))
{
}
void WriteBufferFromHDFS::nextImpl()
{
if (!offset())
return;
size_t bytes_written = 0;
while (bytes_written != offset())
bytes_written += impl->write(working_buffer.begin() + bytes_written, offset() - bytes_written);
}
void WriteBufferFromHDFS::sync()
{
impl->sync();
}
WriteBufferFromHDFS::~WriteBufferFromHDFS()
{
}
}
#endif

View File

@ -0,0 +1,31 @@
#pragma once
#include <Common/config.h>
#if USE_HDFS
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <string>
#include <memory>
namespace DB
{
/** Accepts HDFS path to file and opens it.
* Closes file by himself (thus "owns" a file descriptor).
*/
class WriteBufferFromHDFS : public BufferWithOwnMemory<WriteBuffer>
{
struct WriteBufferFromHDFSImpl;
std::unique_ptr<WriteBufferFromHDFSImpl> impl;
public:
WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
WriteBufferFromHDFS(WriteBufferFromHDFS &&) = default;
void nextImpl() override;
~WriteBufferFromHDFS() override;
void sync();
};
}
#endif

View File

@ -15,12 +15,22 @@
namespace DB
{
namespace ErrorCodes
{
extern const int ALIAS_REQUIRED;
extern const int MULTIPLE_EXPRESSIONS_FOR_ALIAS;
extern const int LOGICAL_ERROR;
}
/// Fills the array_join_result_to_source: on which columns-arrays to replicate, and how to call them after that.
class ArrayJoinedColumnsMatcher
{
public:
struct Data
{
using Aliases = std::unordered_map<String, ASTPtr>;
const Aliases & aliases;
NameToNameMap & array_join_name_to_alias;
NameToNameMap & array_join_alias_to_name;
NameToNameMap & array_join_result_to_source;
@ -30,10 +40,6 @@ public:
static bool needChildVisit(ASTPtr & node, const ASTPtr & child)
{
/// Processed
if (typeid_cast<ASTIdentifier *>(node.get()))
return false;
if (typeid_cast<ASTTablesInSelectQuery *>(node.get()))
return false;
@ -48,10 +54,42 @@ public:
{
if (auto * t = typeid_cast<ASTIdentifier *>(ast.get()))
visit(*t, ast, data);
if (auto * t = typeid_cast<ASTSelectQuery *>(ast.get()))
return visit(*t, ast, data);
return {};
}
private:
static std::vector<ASTPtr *> visit(const ASTSelectQuery & node, ASTPtr &, Data & data)
{
ASTPtr array_join_expression_list = node.array_join_expression_list();
if (!array_join_expression_list)
throw Exception("Logical error: no ARRAY JOIN", ErrorCodes::LOGICAL_ERROR);
std::vector<ASTPtr *> out;
out.reserve(array_join_expression_list->children.size());
for (ASTPtr & ast : array_join_expression_list->children)
{
const String nested_table_name = ast->getColumnName();
const String nested_table_alias = ast->getAliasOrColumnName();
if (nested_table_alias == nested_table_name && !isIdentifier(ast))
throw Exception("No alias for non-trivial value in ARRAY JOIN: " + nested_table_name, ErrorCodes::ALIAS_REQUIRED);
if (data.array_join_alias_to_name.count(nested_table_alias) || data.aliases.count(nested_table_alias))
throw Exception("Duplicate alias in ARRAY JOIN: " + nested_table_alias, ErrorCodes::MULTIPLE_EXPRESSIONS_FOR_ALIAS);
data.array_join_alias_to_name[nested_table_alias] = nested_table_name;
data.array_join_name_to_alias[nested_table_name] = nested_table_alias;
for (ASTPtr & child2 : ast->children)
out.emplace_back(&child2);
}
return out;
}
static void visit(const ASTIdentifier & node, ASTPtr &, Data & data)
{
NameToNameMap & array_join_name_to_alias = data.array_join_name_to_alias;

View File

@ -104,18 +104,17 @@ String Cluster::Address::readableString() const
return res;
}
void Cluster::Address::fromString(const String & host_port_string, String & host_name, UInt16 & port)
std::pair<String, UInt16> Cluster::Address::fromString(const String & host_port_string)
{
auto pos = host_port_string.find_last_of(':');
if (pos == std::string::npos)
throw Exception("Incorrect <host>:<port> format " + host_port_string, ErrorCodes::SYNTAX_ERROR);
host_name = unescapeForFileName(host_port_string.substr(0, pos));
port = parse<UInt16>(host_port_string.substr(pos + 1));
return {unescapeForFileName(host_port_string.substr(0, pos)), parse<UInt16>(host_port_string.substr(pos + 1))};
}
String Cluster::Address::toStringFull() const
String Cluster::Address::toFullString() const
{
return
escapeForFileName(user) +
@ -126,6 +125,42 @@ String Cluster::Address::toStringFull() const
+ ((secure == Protocol::Secure::Enable) ? "+secure" : "");
}
Cluster::Address Cluster::Address::fromFullString(const String & full_string)
{
const char * address_begin = full_string.data();
const char * address_end = address_begin + full_string.size();
Protocol::Secure secure = Protocol::Secure::Disable;
const char * secure_tag = "+secure";
if (endsWith(full_string, secure_tag))
{
address_end -= strlen(secure_tag);
secure = Protocol::Secure::Enable;
}
const char * user_pw_end = strchr(full_string.data(), '@');
const char * colon = strchr(full_string.data(), ':');
if (!user_pw_end || !colon)
throw Exception("Incorrect user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR);
const bool has_pw = colon < user_pw_end;
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception("Incorrect address '" + full_string + "', it does not contain port", ErrorCodes::SYNTAX_ERROR);
const char * has_db = strchr(full_string.data(), '#');
const char * port_end = has_db ? has_db : address_end;
Address address;
address.secure = secure;
address.port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
address.host_name = unescapeForFileName(std::string(user_pw_end + 1, host_end));
address.user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string();
return address;
}
/// Implementation of Clusters class
@ -202,7 +237,6 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
const auto weight = config.getInt(prefix + ".weight", default_weight);
addresses.emplace_back(config, prefix);
addresses.back().replica_num = 1;
const auto & address = addresses.back();
ShardInfo info;
@ -257,14 +291,13 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
if (startsWith(replica_key, "replica"))
{
replica_addresses.emplace_back(config, partial_prefix + replica_key);
replica_addresses.back().replica_num = current_replica_num;
++current_replica_num;
if (!replica_addresses.back().is_local)
{
if (internal_replication)
{
auto dir_name = replica_addresses.back().toStringFull();
auto dir_name = replica_addresses.back().toFullString();
if (first)
dir_name_for_internal_replication = dir_name;
else

View File

@ -59,7 +59,6 @@ public:
String password;
/// This database is selected when no database is specified for Distributed table
String default_database;
UInt32 replica_num;
/// The locality is determined at the initialization, and is not changed even if DNS is changed
bool is_local;
bool user_specified = false;
@ -79,10 +78,11 @@ public:
static String toString(const String & host_name, UInt16 port);
static void fromString(const String & host_port_string, String & host_name, UInt16 & port);
static std::pair<String, UInt16> fromString(const String & host_port_string);
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
String toStringFull() const;
String toFullString() const;
static Address fromFullString(const String & address_full_string);
/// Returns initially resolved address
Poco::Net::SocketAddress getResolvedAddress() const
@ -90,6 +90,9 @@ public:
return initially_resolved_address;
}
auto tuple() const { return std::tie(host_name, port, secure, user, password, default_database); }
bool operator==(const Address & other) const { return tuple() == other.tuple(); }
private:
Poco::Net::SocketAddress initially_resolved_address;
};

View File

@ -186,10 +186,10 @@ SharedLibraryPtr Compiler::getOrCount(
static void addCodeToAssertHeadersMatch(WriteBuffer & out)
{
out <<
"#define STRING2(x) #x\n"
"#define STRING(x) STRING2(x)\n"
"#include <Common/config_version.h>\n"
"#if VERSION_REVISION != " << ClickHouseRevision::get() << "\n"
"#define STRING2(x) #x\n"
"#define STRING(x) STRING2(x)\n"
"#pragma message \"ClickHouse headers revision = \" STRING(VERSION_REVISION) \n"
"#error \"ClickHouse headers revision doesn't match runtime revision of the server (" << ClickHouseRevision::get() << ").\"\n"
"#endif\n\n";

View File

@ -124,7 +124,7 @@ struct ContextShared
ConfigurationPtr config; /// Global configuration settings.
Databases databases; /// List of databases and tables in them.
mutable std::shared_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaeis. Have lazy initialization.
mutable std::shared_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::shared_ptr<ExternalDictionaries> external_dictionaries;
mutable std::shared_ptr<ExternalModels> external_models;
String default_profile_name; /// Default profile name used for default values.

View File

@ -70,7 +70,7 @@ struct HostID
static HostID fromString(const String & host_port_str)
{
HostID res;
Cluster::Address::fromString(host_port_str, res.host_name, res.port);
std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
return res;
}
@ -276,7 +276,7 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason)
catch (...)
{
/// What should we do if we even cannot parse host name and therefore cannot properly submit execution status?
/// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be sucessfull.
/// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be successful.
/// Otherwise, that node will be ignored by DDLQueryStatusInputSream.
tryLogCurrentException(log, "Cannot parse DDL task " + entry_name + ", will try to send error status");
@ -1076,9 +1076,7 @@ public:
status.tryDeserializeText(status_data);
}
String host;
UInt16 port;
Cluster::Address::fromString(host_id, host, port);
auto [host, port] = Cluster::Address::fromString(host_id);
if (status.code != 0 && first_exception == nullptr)
first_exception = std::make_unique<Exception>("There was an error on [" + host + ":" + toString(port) + "]: " + status.message, status.code);
@ -1155,7 +1153,7 @@ private:
Strings current_active_hosts; /// Hosts that were in active state at the last check
size_t num_hosts_finished = 0;
/// Save the first detected error and throw it at the end of excecution
/// Save the first detected error and throw it at the end of execution
std::unique_ptr<Exception> first_exception;
Int64 timeout_seconds = 120;

View File

@ -3,6 +3,7 @@
#include <chrono>
#include <string>
#include <memory>
#include <boost/noncopyable.hpp>
#include <Core/Types.h>
@ -26,7 +27,7 @@ struct ExternalLoadableLifetime final
/// Basic interface for external loadable objects. Is used in ExternalLoader.
class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>
class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>, private boost::noncopyable
{
public:
virtual ~IExternalLoadable() = default;

View File

@ -77,7 +77,7 @@ BlockIO InterpreterRenameQuery::execute()
std::set<UniqueTableName> unique_tables_from;
/// Don't allow to drop tables (that we are renaming); do't allow to create tables in places where tables will be renamed.
/// Don't allow to drop tables (that we are renaming); don't allow to create tables in places where tables will be renamed.
std::map<UniqueTableName, std::unique_ptr<DDLGuard>> table_guards;
for (const auto & elem : rename.elements)

View File

@ -586,7 +586,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
executePreLimit(pipeline);
}
// If there is no global subqueries, we can run subqueries only when recieve them on server.
// If there is no global subqueries, we can run subqueries only when receive them on server.
if (!query_analyzer->hasGlobalSubqueries() && !expressions.subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets);
}

View File

@ -53,7 +53,7 @@ ExecutionStatus getOverallExecutionStatusOfCommands()
return ExecutionStatus(0);
}
/// Consequently tries to execute all commands and genreates final exception message for failed commands
/// Consequently tries to execute all commands and generates final exception message for failed commands
template <typename Callable, typename ... Callables>
ExecutionStatus getOverallExecutionStatusOfCommands(Callable && command, Callables && ... commands)
{

View File

@ -157,7 +157,7 @@ void RequiredSourceColumnsMatcher::visit(const ASTFunction & node, const ASTPtr
local_aliases.push_back(name);
/// visit child with masked local aliases
visit(node.arguments->children[1], data);
RequiredSourceColumnsVisitor(data).visit(node.arguments->children[1]);
for (const auto & name : local_aliases)
data.private_aliases.erase(name);

View File

@ -278,7 +278,7 @@ struct Settings
M(SettingBool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.") \
M(SettingBool, log_query_settings, true, "Log query settings into the query_log.") \
M(SettingBool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.") \
M(SettingLogsLevel, send_logs_level, "none", "Send server text logs with specified minumum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'none'") \
M(SettingLogsLevel, send_logs_level, "none", "Send server text logs with specified minimum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'none'") \
M(SettingBool, enable_optimize_predicate_expression, 0, "If it is set to true, optimize predicates to subqueries.") \
\
M(SettingUInt64, low_cardinality_max_dictionary_size, 8192, "Maximum size (in rows) of shared global dictionary for LowCardinality type.") \

View File

@ -136,7 +136,7 @@ void NO_INLINE Aggregator::executeSpecializedCase(
AggregateDataPtr overflow_row) const
{
/// For all rows.
typename Method::Key prev_key;
typename Method::Key prev_key{};
AggregateDataPtr value = nullptr;
for (size_t i = 0; i < rows; ++i)
{

View File

@ -35,8 +35,6 @@ namespace DB
namespace ErrorCodes
{
extern const int ALIAS_REQUIRED;
extern const int MULTIPLE_EXPRESSIONS_FOR_ALIAS;
extern const int EMPTY_NESTED_TABLE;
extern const int LOGICAL_ERROR;
extern const int INVALID_JOIN_ON_EXPRESSION;
@ -434,33 +432,13 @@ void optimizeUsing(const ASTSelectQuery * select_query)
void getArrayJoinedColumns(ASTPtr & query, SyntaxAnalyzerResult & result, const ASTSelectQuery * select_query,
const Names & source_columns, const NameSet & source_columns_set)
{
ASTPtr array_join_expression_list = select_query->array_join_expression_list();
if (array_join_expression_list)
if (ASTPtr array_join_expression_list = select_query->array_join_expression_list())
{
ASTs & array_join_asts = array_join_expression_list->children;
for (const auto & ast : array_join_asts)
{
const String nested_table_name = ast->getColumnName();
const String nested_table_alias = ast->getAliasOrColumnName();
if (nested_table_alias == nested_table_name && !isIdentifier(ast))
throw Exception("No alias for non-trivial value in ARRAY JOIN: " + nested_table_name,
ErrorCodes::ALIAS_REQUIRED);
if (result.array_join_alias_to_name.count(nested_table_alias) || result.aliases.count(nested_table_alias))
throw Exception("Duplicate alias in ARRAY JOIN: " + nested_table_alias,
ErrorCodes::MULTIPLE_EXPRESSIONS_FOR_ALIAS);
result.array_join_alias_to_name[nested_table_alias] = nested_table_name;
result.array_join_name_to_alias[nested_table_name] = nested_table_alias;
}
{
ArrayJoinedColumnsVisitor::Data visitor_data{result.array_join_name_to_alias,
result.array_join_alias_to_name,
result.array_join_result_to_source};
ArrayJoinedColumnsVisitor(visitor_data).visit(query);
}
ArrayJoinedColumnsVisitor::Data visitor_data{result.aliases,
result.array_join_name_to_alias,
result.array_join_alias_to_name,
result.array_join_result_to_source};
ArrayJoinedColumnsVisitor(visitor_data).visit(query);
/// If the result of ARRAY JOIN is not used, it is necessary to ARRAY-JOIN any column,
/// to get the correct number of rows.
@ -788,7 +766,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, context, subquery_depth);
/// Optimize if with constant condition after constants was substituted instead of sclalar subqueries.
/// Optimize if with constant condition after constants was substituted instead of scalar subqueries.
OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query);
if (select_query)

View File

@ -16,7 +16,6 @@ struct SyntaxAnalyzerResult
NamesAndTypesList source_columns;
/// Note: used only in tests.
using Aliases = std::unordered_map<String, ASTPtr>;
Aliases aliases;

View File

@ -115,7 +115,7 @@ std::vector<ASTPtr *> TranslateQualifiedNamesMatcher::visit(ASTTableJoin & join,
std::vector<ASTPtr *> TranslateQualifiedNamesMatcher::visit(ASTSelectQuery & select, const ASTPtr & , Data &)
{
/// If the WHERE clause or HAVING consists of a single quailified column, the reference must be translated not only in children,
/// If the WHERE clause or HAVING consists of a single qualified column, the reference must be translated not only in children,
/// but also in where_expression and having_expression.
std::vector<ASTPtr *> out;
if (select.prewhere_expression)

View File

@ -330,7 +330,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out))
{
/// NOTE: Redundancy. The same values coulld be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in
/// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in
elem.result_rows = counting_stream->getProgress().rows;
elem.result_bytes = counting_stream->getProgress().bytes;
}

View File

@ -15,7 +15,7 @@ namespace DB
* or
* (subquery)
*
* Optionally with alias (correllation name):
* Optionally with alias (correlation name):
* [AS] alias
*
* Table may contain FINAL and SAMPLE modifiers:

View File

@ -132,7 +132,7 @@ Token Lexer::nextTokenImpl()
++pos;
}
/// exponentation (base 10 or base 2)
/// exponentiation (base 10 or base 2)
if (pos + 1 < end && (hex ? (*pos == 'p' || *pos == 'P') : (*pos == 'e' || *pos == 'E')))
{
++pos;
@ -195,7 +195,7 @@ Token Lexer::nextTokenImpl()
while (pos < end && isNumericASCII(*pos))
++pos;
/// exponentation
/// exponentiation
if (pos + 1 < end && (*pos == 'e' || *pos == 'E'))
{
++pos;

View File

@ -354,8 +354,6 @@ std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, s
begin = pos;
ast = parseQueryAndMovePosition(parser, pos, end, "", true, 0);
if (!ast)
break;
ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(ast.get());

View File

@ -49,7 +49,7 @@ ASTPtr parseQuery(
/** Split queries separated by ; on to list of single queries
* Returns pointer to the end of last sucessfuly parsed query (first), and true if all queries are sucessfuly parsed (second)
* Returns pointer to the end of last successfully parsed query (first), and true if all queries are successfully parsed (second)
* NOTE: INSERT's data should be placed in single line.
*/
std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list);

View File

@ -240,6 +240,12 @@ CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_
return codec->second;
}
CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_name) const
{
return getCodecOrDefault(column_name, CompressionCodecFactory::instance().getDefaultCodec());
}
ColumnsDescription ColumnsDescription::parse(const String & str)
{
ReadBufferFromString buf{str};

View File

@ -69,6 +69,8 @@ struct ColumnsDescription
CompressionCodecPtr getCodecOrDefault(const String & column_name, CompressionCodecPtr default_codec) const;
CompressionCodecPtr getCodecOrDefault(const String & column_name) const;
static ColumnsDescription parse(const String & str);
static const ColumnsDescription * loadFromContext(const Context & context, const String & db, const String & table);

View File

@ -48,40 +48,8 @@ namespace
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const auto address = boost::copy_range<std::string>(*it);
const char * address_begin = static_cast<const char*>(address.data());
const char * address_end = address_begin + address.size();
Protocol::Secure secure = Protocol::Secure::Disable;
const char * secure_tag = "+secure";
if (endsWith(address, secure_tag))
{
address_end -= strlen(secure_tag);
secure = Protocol::Secure::Enable;
}
const char * user_pw_end = strchr(address.data(), '@');
const char * colon = strchr(address.data(), ':');
if (!user_pw_end || !colon)
throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port#default_database' pattern",
ErrorCodes::INCORRECT_FILE_NAME};
const bool has_pw = colon < user_pw_end;
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception{"Shard address '" + address + "' does not contain port", ErrorCodes::INCORRECT_FILE_NAME};
const char * has_db = strchr(address.data(), '#');
const char * port_end = has_db ? has_db : address_end;
const auto user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
const auto password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
const auto host = unescapeForFileName(std::string(user_pw_end + 1, host_end));
const auto port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
const auto database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end))
: std::string();
pools.emplace_back(factory(host, port, secure, user, password, database));
Cluster::Address address = Cluster::Address::fromFullString(boost::copy_range<std::string>(*it));
pools.emplace_back(factory(address));
}
return pools;
@ -175,17 +143,29 @@ void StorageDistributedDirectoryMonitor::run()
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
const auto pool_factory = [&storage, &timeouts] (const std::string & host, const UInt16 port,
const Protocol::Secure secure,
const std::string & user, const std::string & password,
const std::string & default_database)
const auto pool_factory = [&storage, &timeouts] (const Cluster::Address & address) -> ConnectionPoolPtr
{
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();
/// existing connections pool have a higher priority
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index];
for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
{
const Cluster::Address & replica_address = replicas_addresses[replica_index];
if (address == replica_address)
return shards_info[shard_index].per_replica_pools[replica_index];
}
}
return std::make_shared<ConnectionPool>(
1, host, port, default_database,
user, password, timeouts,
storage.getName() + '_' + user,
Protocol::Compression::Enable,
secure);
1, address.host_name, address.port, address.default_database, address.user, address.password, timeouts,
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
};
auto pools = createPoolsForAddresses(name, pool_factory);

View File

@ -494,7 +494,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
if (!address.is_local)
dir_names.push_back(address.toStringFull());
dir_names.push_back(address.toFullString());
if (!dir_names.empty())
writeToShard(block, dir_names);

Some files were not shown because too many files have changed in this diff Show More