Merge branch 'master' into master

This commit is contained in:
alexey-milovidov 2018-07-30 22:49:07 +03:00 committed by GitHub
commit 6a71a1ea13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
87 changed files with 1311 additions and 362 deletions

1
.gitignore vendored
View File

@ -241,5 +241,6 @@ node_modules
public public
website/docs website/docs
website/presentations website/presentations
website/package-lock.json
.DS_Store .DS_Store
*/.DS_Store */.DS_Store

View File

@ -283,6 +283,7 @@ include (cmake/find_contrib_lib.cmake)
find_contrib_lib(cityhash) find_contrib_lib(cityhash)
find_contrib_lib(farmhash) find_contrib_lib(farmhash)
find_contrib_lib(metrohash) find_contrib_lib(metrohash)
find_contrib_lib(murmurhash2)
find_contrib_lib(btrie) find_contrib_lib(btrie)
find_contrib_lib(double-conversion) find_contrib_lib(double-conversion)

View File

@ -45,6 +45,10 @@ if (USE_INTERNAL_UNWIND_LIBRARY)
add_subdirectory (libunwind) add_subdirectory (libunwind)
endif () endif ()
if (USE_INTERNAL_MURMURHASH2_LIBRARY)
add_subdirectory (libmurmurhash2)
endif ()
if (USE_INTERNAL_ZLIB_LIBRARY) if (USE_INTERNAL_ZLIB_LIBRARY)
add_subdirectory (${INTERNAL_ZLIB_NAME}) add_subdirectory (${INTERNAL_ZLIB_NAME})
# todo: make pull to Dead2/zlib-ng and remove: # todo: make pull to Dead2/zlib-ng and remove:

View File

@ -0,0 +1,6 @@
add_library(murmurhash2
src/murmurhash2.cpp
include/murmurhash2.h)
target_include_directories (murmurhash2 PUBLIC include)
target_include_directories (murmurhash2 PUBLIC src)

View File

@ -0,0 +1 @@
MurmurHash2 was written by Austin Appleby, and is placed in the publicdomain. The author hereby disclaims copyright to this source code.

View File

@ -0,0 +1,6 @@
Original URL: https://github.com/aappleby/smhasher
version:
commit 61a0530f28277f2e850bfc39600ce61d02b518de
author aappleby@gmail.com
date 2016-01-09T06:07:17Z

View File

@ -0,0 +1,35 @@
//-----------------------------------------------------------------------------
// MurmurHash2 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
#ifndef _MURMURHASH2_H_
#define _MURMURHASH2_H_
//-----------------------------------------------------------------------------
// Platform-specific functions and macros
// Microsoft Visual Studio
#if defined(_MSC_VER) && (_MSC_VER < 1600)
typedef unsigned char uint8_t;
typedef unsigned int uint32_t;
typedef unsigned __int64 uint64_t;
// Other compilers
#else // defined(_MSC_VER)
#include <stdint.h>
#endif // !defined(_MSC_VER)
uint32_t MurmurHash2 (const void * key, int len, uint32_t seed);
uint64_t MurmurHash64A (const void * key, int len, uint64_t seed);
uint64_t MurmurHash64B (const void * key, int len, uint64_t seed);
uint32_t MurmurHash2A (const void * key, int len, uint32_t seed);
uint32_t MurmurHashNeutral2 (const void * key, int len, uint32_t seed);
uint32_t MurmurHashAligned2 (const void * key, int len, uint32_t seed);
#endif // _MURMURHASH2_H_

View File

@ -0,0 +1,421 @@
// MurmurHash2 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
// Note - This code makes a few assumptions about how your machine behaves -
// 1. We can read a 4-byte value from any address without crashing
// 2. sizeof(int) == 4
// And it has a few limitations -
// 1. It will not work incrementally.
// 2. It will not produce the same results on little-endian and big-endian
// machines.
#include "murmurhash2.h"
// Platform-specific functions and macros
// Microsoft Visual Studio
#if defined(_MSC_VER)
#define BIG_CONSTANT(x) (x)
// Other compilers
#else // defined(_MSC_VER)
#define BIG_CONSTANT(x) (x##LLU)
#endif // !defined(_MSC_VER)
uint32_t MurmurHash2(const void * key, int len, uint32_t seed)
{
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
const uint32_t m = 0x5bd1e995;
const int r = 24;
// Initialize the hash to a 'random' value
uint32_t h = seed ^ len;
// Mix 4 bytes at a time into the hash
const unsigned char * data = reinterpret_cast<const unsigned char *>(key);
while (len >= 4)
{
uint32_t k = *reinterpret_cast<const uint32_t *>(data);
k *= m;
k ^= k >> r;
k *= m;
h *= m;
h ^= k;
data += 4;
len -= 4;
}
// Handle the last few bytes of the input array
switch (len)
{
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0];
h *= m;
};
// Do a few final mixes of the hash to ensure the last few
// bytes are well-incorporated.
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
// MurmurHash2, 64-bit versions, by Austin Appleby
// The same caveats as 32-bit MurmurHash2 apply here - beware of alignment
// and endian-ness issues if used across multiple platforms.
// 64-bit hash for 64-bit platforms
uint64_t MurmurHash64A(const void * key, int len, uint64_t seed)
{
const uint64_t m = BIG_CONSTANT(0xc6a4a7935bd1e995);
const int r = 47;
uint64_t h = seed ^ (len * m);
const uint64_t * data = reinterpret_cast<const uint64_t *>(key);
const uint64_t * end = data + (len/8);
while (data != end)
{
uint64_t k = *data++;
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
}
const unsigned char * data2 = reinterpret_cast<const unsigned char *>(data);
switch (len & 7)
{
case 7: h ^= static_cast<uint64_t>(data2[6]) << 48;
case 6: h ^= static_cast<uint64_t>(data2[5]) << 40;
case 5: h ^= static_cast<uint64_t>(data2[4]) << 32;
case 4: h ^= static_cast<uint64_t>(data2[3]) << 24;
case 3: h ^= static_cast<uint64_t>(data2[2]) << 16;
case 2: h ^= static_cast<uint64_t>(data2[1]) << 8;
case 1: h ^= static_cast<uint64_t>(data2[0]);
h *= m;
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
// 64-bit hash for 32-bit platforms
uint64_t MurmurHash64B(const void * key, int len, uint64_t seed)
{
const uint32_t m = 0x5bd1e995;
const int r = 24;
uint32_t h1 = static_cast<uint32_t>(seed) ^ len;
uint32_t h2 = static_cast<uint32_t>(seed >> 32);
const uint32_t * data = reinterpret_cast<const uint32_t *>(key);
while (len >= 8)
{
uint32_t k1 = *data++;
k1 *= m; k1 ^= k1 >> r; k1 *= m;
h1 *= m; h1 ^= k1;
len -= 4;
uint32_t k2 = *data++;
k2 *= m; k2 ^= k2 >> r; k2 *= m;
h2 *= m; h2 ^= k2;
len -= 4;
}
if (len >= 4)
{
uint32_t k1 = *data++;
k1 *= m; k1 ^= k1 >> r; k1 *= m;
h1 *= m; h1 ^= k1;
len -= 4;
}
switch (len)
{
case 3: h2 ^= reinterpret_cast<const unsigned char *>(data)[2] << 16;
case 2: h2 ^= reinterpret_cast<const unsigned char *>(data)[1] << 8;
case 1: h2 ^= reinterpret_cast<const unsigned char *>(data)[0];
h2 *= m;
};
h1 ^= h2 >> 18; h1 *= m;
h2 ^= h1 >> 22; h2 *= m;
h1 ^= h2 >> 17; h1 *= m;
h2 ^= h1 >> 19; h2 *= m;
uint64_t h = h1;
h = (h << 32) | h2;
return h;
}
// MurmurHash2A, by Austin Appleby
// This is a variant of MurmurHash2 modified to use the Merkle-Damgard
// construction. Bulk speed should be identical to Murmur2, small-key speed
// will be 10%-20% slower due to the added overhead at the end of the hash.
// This variant fixes a minor issue where null keys were more likely to
// collide with each other than expected, and also makes the function
// more amenable to incremental implementations.
#define mmix(h,k) { k *= m; k ^= k >> r; k *= m; h *= m; h ^= k; }
uint32_t MurmurHash2A(const void * key, int len, uint32_t seed)
{
const uint32_t m = 0x5bd1e995;
const int r = 24;
uint32_t l = len;
const unsigned char * data = reinterpret_cast<const unsigned char *>(key);
uint32_t h = seed;
while (len >= 4)
{
uint32_t k = *reinterpret_cast<const uint32_t *>(data);
mmix(h,k);
data += 4;
len -= 4;
}
uint32_t t = 0;
switch (len)
{
case 3: t ^= data[2] << 16;
case 2: t ^= data[1] << 8;
case 1: t ^= data[0];
};
mmix(h,t);
mmix(h,l);
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
// MurmurHashNeutral2, by Austin Appleby
// Same as MurmurHash2, but endian- and alignment-neutral.
// Half the speed though, alas.
uint32_t MurmurHashNeutral2(const void * key, int len, uint32_t seed)
{
const uint32_t m = 0x5bd1e995;
const int r = 24;
uint32_t h = seed ^ len;
const unsigned char * data = reinterpret_cast<const unsigned char *>(key);
while (len >= 4)
{
uint32_t k;
k = data[0];
k |= data[1] << 8;
k |= data[2] << 16;
k |= data[3] << 24;
k *= m;
k ^= k >> r;
k *= m;
h *= m;
h ^= k;
data += 4;
len -= 4;
}
switch (len)
{
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0];
h *= m;
};
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
//-----------------------------------------------------------------------------
// MurmurHashAligned2, by Austin Appleby
// Same algorithm as MurmurHash2, but only does aligned reads - should be safer
// on certain platforms.
// Performance will be lower than MurmurHash2
#define MIX(h,k,m) { k *= m; k ^= k >> r; k *= m; h *= m; h ^= k; }
uint32_t MurmurHashAligned2(const void * key, int len, uint32_t seed)
{
const uint32_t m = 0x5bd1e995;
const int r = 24;
const unsigned char * data = reinterpret_cast<const unsigned char *>(key);
uint32_t h = seed ^ len;
int align = reinterpret_cast<uint64_t>(data) & 3;
if (align && (len >= 4))
{
// Pre-load the temp registers
uint32_t t = 0, d = 0;
switch (align)
{
case 1: t |= data[2] << 16;
case 2: t |= data[1] << 8;
case 3: t |= data[0];
}
t <<= (8 * align);
data += 4-align;
len -= 4-align;
int sl = 8 * (4-align);
int sr = 8 * align;
// Mix
while (len >= 4)
{
d = *(reinterpret_cast<const uint32_t *>(data));
t = (t >> sr) | (d << sl);
uint32_t k = t;
MIX(h,k,m);
t = d;
data += 4;
len -= 4;
}
// Handle leftover data in temp registers
d = 0;
if (len >= align)
{
switch (align)
{
case 3: d |= data[2] << 16;
case 2: d |= data[1] << 8;
case 1: d |= data[0];
}
uint32_t k = (t >> sr) | (d << sl);
MIX(h,k,m);
data += align;
len -= align;
//----------
// Handle tail bytes
switch (len)
{
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0];
h *= m;
};
}
else
{
switch (len)
{
case 3: d |= data[2] << 16;
case 2: d |= data[1] << 8;
case 1: d |= data[0];
case 0: h ^= (t >> sr) | (d << sl);
h *= m;
}
}
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
else
{
while (len >= 4)
{
uint32_t k = *reinterpret_cast<const uint32_t *>(data);
MIX(h,k,m);
data += 4;
len -= 4;
}
// Handle tail bytes
switch (len)
{
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0];
h *= m;
};
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
}

View File

@ -16,22 +16,22 @@ target_include_directories (re2_st PRIVATE . PUBLIC ${CMAKE_CURRENT_BINARY_DIR}
file (MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/re2_st) file (MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/re2_st)
foreach (FILENAME filtered_re2.h re2.h set.h stringpiece.h) foreach (FILENAME filtered_re2.h re2.h set.h stringpiece.h)
add_custom_command (OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/re2_st/${FILENAME}" add_custom_command (OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/re2_st/${FILENAME}"
COMMAND ${CMAKE_COMMAND} -DSOURCE_FILENAME="${RE2_SOURCE_DIR}/re2/${FILENAME}" COMMAND ${CMAKE_COMMAND} -DSOURCE_FILENAME="${RE2_SOURCE_DIR}/re2/${FILENAME}"
-DTARGET_FILENAME="${CMAKE_CURRENT_BINARY_DIR}/re2_st/${FILENAME}" -DTARGET_FILENAME="${CMAKE_CURRENT_BINARY_DIR}/re2_st/${FILENAME}"
-P "${CMAKE_CURRENT_SOURCE_DIR}/re2_transform.cmake" -P "${CMAKE_CURRENT_SOURCE_DIR}/re2_transform.cmake"
COMMENT "Creating ${FILENAME} for re2_st library.") COMMENT "Creating ${FILENAME} for re2_st library.")
add_custom_target (transform_${FILENAME} DEPENDS "${CMAKE_CURRENT_BINARY_DIR}/re2_st/${FILENAME}") add_custom_target (transform_${FILENAME} DEPENDS "${CMAKE_CURRENT_BINARY_DIR}/re2_st/${FILENAME}")
add_dependencies (re2_st transform_${FILENAME}) add_dependencies (re2_st transform_${FILENAME})
endforeach () endforeach ()
file (MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/util) file (MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/util)
foreach (FILENAME mutex.h) foreach (FILENAME mutex.h)
add_custom_command (OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/util/${FILENAME}" add_custom_command (OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/util/${FILENAME}"
COMMAND ${CMAKE_COMMAND} -DSOURCE_FILENAME="${RE2_SOURCE_DIR}/util/${FILENAME}" COMMAND ${CMAKE_COMMAND} -DSOURCE_FILENAME="${RE2_SOURCE_DIR}/util/${FILENAME}"
-DTARGET_FILENAME="${CMAKE_CURRENT_BINARY_DIR}/util/${FILENAME}" -DTARGET_FILENAME="${CMAKE_CURRENT_BINARY_DIR}/util/${FILENAME}"
-P "${CMAKE_CURRENT_SOURCE_DIR}/re2_transform.cmake" -P "${CMAKE_CURRENT_SOURCE_DIR}/re2_transform.cmake"
COMMENT "Creating ${FILENAME} for re2_st library.") COMMENT "Creating ${FILENAME} for re2_st library.")
add_custom_target (transform_${FILENAME} DEPENDS "${CMAKE_CURRENT_BINARY_DIR}/util/${FILENAME}") add_custom_target (transform_${FILENAME} DEPENDS "${CMAKE_CURRENT_BINARY_DIR}/util/${FILENAME}")
add_dependencies (re2_st transform_${FILENAME}) add_dependencies (re2_st transform_${FILENAME})
endforeach () endforeach ()

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh: # This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54397 CACHE STRING "") set(VERSION_REVISION 54399 CACHE STRING "")
set(VERSION_MAJOR 18 CACHE STRING "") set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 2 CACHE STRING "") set(VERSION_MINOR 4 CACHE STRING "")
set(VERSION_PATCH 0 CACHE STRING "") set(VERSION_PATCH 0 CACHE STRING "")
set(VERSION_GITHASH 6ad677d7d6961a0c9088ccd9eff55779cfdaa654 CACHE STRING "") set(VERSION_GITHASH 8bc95412b66b360fdbef5bb0cec5217378f066a6 CACHE STRING "")
set(VERSION_DESCRIBE v18.2.0-testing CACHE STRING "") set(VERSION_DESCRIBE v18.4.0-testing CACHE STRING "")
set(VERSION_STRING 18.2.0 CACHE STRING "") set(VERSION_STRING 18.4.0 CACHE STRING "")
# end of autochange # end of autochange
set(VERSION_EXTRA "" CACHE STRING "") set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -86,9 +86,106 @@ namespace ErrorCodes
extern const int UNKNOWN_PACKET_FROM_SERVER; extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int UNEXPECTED_PACKET_FROM_SERVER; extern const int UNEXPECTED_PACKET_FROM_SERVER;
extern const int CLIENT_OUTPUT_FORMAT_SPECIFIED; extern const int CLIENT_OUTPUT_FORMAT_SPECIFIED;
extern const int LOGICAL_ERROR;
} }
/// Checks expected server and client error codes in testmode.
/// To enable it add special comment after the query: "-- { serverError 60 }" or "-- { clientError 20 }".
class TestHint
{
public:
TestHint(bool enabled_, const String & query)
: enabled(enabled_),
server_error(0),
client_error(0)
{
if (!enabled_)
return;
size_t pos = query.find("--");
if (pos != String::npos && query.find("--", pos + 2) != String::npos)
return; /// It's not last comment. Hint belongs to commented query.
if (pos != String::npos)
{
pos = query.find('{', pos + 2);
if (pos != String::npos)
{
String hint = query.substr(pos + 1);
pos = hint.find('}');
hint.resize(pos);
parse(hint);
}
}
}
/// @returns true if it's possible to continue without reconnect
bool checkActual(int & actual_server_error, int & actual_client_error,
bool & got_exception, std::unique_ptr<Exception> & last_exception) const
{
if (!enabled)
return true;
if (allErrorsExpected(actual_server_error, actual_client_error))
{
got_exception = false;
last_exception.reset();
actual_server_error = 0;
actual_client_error = 0;
return false;
}
if (lostExpectedError(actual_server_error, actual_client_error))
{
std::cerr << "Success when error expected. It expects server error "
<< server_error << ", client error " << client_error << "." << std::endl;
got_exception = true;
last_exception = std::make_unique<Exception>("Success when error expected", ErrorCodes::LOGICAL_ERROR); /// return error to OS
return false;
}
return true;
}
int serverError() const { return server_error; }
int clientError() const { return client_error; }
private:
bool enabled;
int server_error;
int client_error;
void parse(const String & hint)
{
std::stringstream ss;
ss << hint;
while (!ss.eof())
{
String item;
ss >> item;
if (item.empty())
break;
if (item == "serverError")
ss >> server_error;
else if (item == "clientError")
ss >> client_error;
}
}
bool allErrorsExpected(int actual_server_error, int actual_client_error) const
{
return (server_error || client_error) && (server_error == actual_server_error) && (client_error == actual_client_error);
}
bool lostExpectedError(int actual_server_error, int actual_client_error) const
{
return (server_error && !actual_server_error) || (client_error && !actual_client_error);
}
};
class Client : public Poco::Util::Application class Client : public Poco::Util::Application
{ {
public: public:
@ -157,6 +254,10 @@ private:
/// If the last query resulted in exception. /// If the last query resulted in exception.
bool got_exception = false; bool got_exception = false;
int expected_server_error = 0;
int expected_client_error = 0;
int actual_server_error = 0;
int actual_client_error = 0;
String server_version; String server_version;
String server_display_name; String server_display_name;
@ -617,10 +718,14 @@ private:
} }
catch (const Exception & e) catch (const Exception & e)
{ {
std::cerr << std::endl actual_client_error = e.code();
<< "Exception on client:" << std::endl if (!actual_client_error || actual_client_error != expected_client_error)
<< "Code: " << e.code() << ". " << e.displayText() << std::endl {
<< std::endl; std::cerr << std::endl
<< "Exception on client:" << std::endl
<< "Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl;
}
/// Client-side exception during query execution can result in the loss of /// Client-side exception during query execution can result in the loss of
/// sync in the connection protocol. /// sync in the connection protocol.
@ -659,6 +764,7 @@ private:
bool process(const String & text) bool process(const String & text)
{ {
const auto ignore_error = config().getBool("ignore-error", false); const auto ignore_error = config().getBool("ignore-error", false);
const bool test_mode = config().has("testmode");
if (config().has("multiquery")) if (config().has("multiquery"))
{ {
/// Several queries separated by ';'. /// Several queries separated by ';'.
@ -702,6 +808,10 @@ private:
while (isWhitespaceASCII(*begin) || *begin == ';') while (isWhitespaceASCII(*begin) || *begin == ';')
++begin; ++begin;
TestHint test_hint(test_mode, query);
expected_client_error = test_hint.clientError();
expected_server_error = test_hint.serverError();
try try
{ {
if (!processSingleQuery(query, ast) && !ignore_error) if (!processSingleQuery(query, ast) && !ignore_error)
@ -709,10 +819,15 @@ private:
} }
catch (...) catch (...)
{ {
std::cerr << "Error on processing query: " << query << std::endl << getCurrentExceptionMessage(true); actual_client_error = getCurrentExceptionCode();
if (!actual_client_error || actual_client_error != expected_client_error)
std::cerr << "Error on processing query: " << query << std::endl << getCurrentExceptionMessage(true);
got_exception = true; got_exception = true;
} }
if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception))
connection->forceConnected();
if (got_exception && !ignore_error) if (got_exception && !ignore_error)
{ {
if (is_interactive) if (is_interactive)
@ -1286,6 +1401,14 @@ private:
resetOutput(); resetOutput();
got_exception = true; got_exception = true;
actual_server_error = e.code();
if (expected_server_error)
{
if (actual_server_error == expected_server_error)
return;
std::cerr << "Expected error code: " << expected_server_error << " but got: " << actual_server_error << "." << std::endl;
}
std::string text = e.displayText(); std::string text = e.displayText();
auto embedded_stack_trace_pos = text.find("Stack trace"); auto embedded_stack_trace_pos = text.find("Stack trace");
@ -1411,7 +1534,8 @@ public:
("pager", boost::program_options::value<std::string>(), "pager") ("pager", boost::program_options::value<std::string>(), "pager")
("multiline,m", "multiline") ("multiline,m", "multiline")
("multiquery,n", "multiquery") ("multiquery,n", "multiquery")
("ignore-error", "Do not stop processing in multiquery mode") ("testmode,T", "enable test hints in comments")
("ignore-error", "do not stop processing in multiquery mode")
("format,f", boost::program_options::value<std::string>(), "default output format") ("format,f", boost::program_options::value<std::string>(), "default output format")
("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command") ("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command")
("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)") ("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)")
@ -1517,6 +1641,8 @@ public:
config().setBool("multiline", true); config().setBool("multiline", true);
if (options.count("multiquery")) if (options.count("multiquery"))
config().setBool("multiquery", true); config().setBool("multiquery", true);
if (options.count("testmode"))
config().setBool("testmode", true);
if (options.count("ignore-error")) if (options.count("ignore-error"))
config().setBool("ignore-error", true); config().setBool("ignore-error", true);
if (options.count("format")) if (options.count("format"))

View File

@ -646,7 +646,7 @@ struct AggregateFunctionAnyHeavyData : Data
} }
else else
{ {
if (counter < to.counter) if ((!this->has() && to.has()) || counter < to.counter)
{ {
this->change(to, arena); this->change(to, arena);
return true; return true;

View File

@ -2,6 +2,7 @@
#include <sys/utsname.h> #include <sys/utsname.h>
#include <cerrno> #include <cerrno>
#include <cstdlib>
#include <cstring> #include <cstring>
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
@ -103,7 +104,9 @@ static ElementIdentifier getElementIdentifier(Node * element)
{ {
const Node * node = attrs->item(i); const Node * node = attrs->item(i);
std::string name = node->nodeName(); std::string name = node->nodeName();
if (name == "replace" || name == "remove" || name == "incl" || name == "from_zk") auto subst_name_pos = std::find(ConfigProcessor::SUBSTITUTION_ATTRS.begin(), ConfigProcessor::SUBSTITUTION_ATTRS.end(), name);
if (name == "replace" || name == "remove" ||
subst_name_pos != ConfigProcessor::SUBSTITUTION_ATTRS.end())
continue; continue;
std::string value = node->nodeValue(); std::string value = node->nodeValue();
attrs_kv.push_back(std::make_pair(name, value)); attrs_kv.push_back(std::make_pair(name, value));
@ -267,12 +270,18 @@ void ConfigProcessor::doIncludesRecursive(
return; return;
} }
std::map<std::string, const Node *> attr_nodes;
NamedNodeMapPtr attributes = node->attributes(); NamedNodeMapPtr attributes = node->attributes();
const Node * incl_attribute = attributes->getNamedItem("incl"); size_t substs_count = 0;
const Node * from_zk_attribute = attributes->getNamedItem("from_zk"); for (const auto & attr_name : SUBSTITUTION_ATTRS)
{
auto subst = attributes->getNamedItem(attr_name);
attr_nodes[attr_name] = subst;
substs_count += static_cast<size_t>(subst == nullptr);
}
if (incl_attribute && from_zk_attribute) if (substs_count < SUBSTITUTION_ATTRS.size() - 1) /// only one substitution is allowed
throw Poco::Exception("both incl and from_zk attributes set for element <" + node->nodeName() + ">"); throw Poco::Exception("several substitutions attributes set for element <" + node->nodeName() + ">");
/// Replace the original contents, not add to it. /// Replace the original contents, not add to it.
bool replace = attributes->getNamedItem("replace"); bool replace = attributes->getNamedItem("replace");
@ -296,8 +305,8 @@ void ConfigProcessor::doIncludesRecursive(
{ {
Element & element = dynamic_cast<Element &>(*node); Element & element = dynamic_cast<Element &>(*node);
element.removeAttribute("incl"); for (const auto & attr_name : SUBSTITUTION_ATTRS)
element.removeAttribute("from_zk"); element.removeAttribute(attr_name);
if (replace) if (replace)
{ {
@ -324,16 +333,19 @@ void ConfigProcessor::doIncludesRecursive(
} }
}; };
auto get_incl_node = [&](const std::string & name) if (attr_nodes["incl"]) // we have include subst
{ {
return include_from ? include_from->getNodeByPath("yandex/" + name) : nullptr; auto get_incl_node = [&](const std::string & name)
}; {
if (incl_attribute) return include_from ? include_from->getNodeByPath("yandex/" + name) : nullptr;
process_include(incl_attribute, get_incl_node, "Include not found: "); };
if (from_zk_attribute) process_include(attr_nodes["incl"], get_incl_node, "Include not found: ");
}
if (attr_nodes["from_zk"]) /// we have zookeeper subst
{ {
contributing_zk_paths.insert(from_zk_attribute->getNodeValue()); contributing_zk_paths.insert(attr_nodes["from_zk"]->getNodeValue());
if (zk_node_cache) if (zk_node_cache)
{ {
@ -349,10 +361,27 @@ void ConfigProcessor::doIncludesRecursive(
return getRootNode(zk_document.get()); return getRootNode(zk_document.get());
}; };
process_include(from_zk_attribute, get_zk_node, "Could not get ZooKeeper node: "); process_include(attr_nodes["from_zk"], get_zk_node, "Could not get ZooKeeper node: ");
} }
} }
if (attr_nodes["from_env"]) /// we have env subst
{
XMLDocumentPtr env_document;
auto get_env_node = [&](const std::string & name) -> const Node *
{
const char * env_val = std::getenv(name.c_str());
if (env_val == nullptr)
return nullptr;
env_document = dom_parser.parseString("<from_env>" + std::string{env_val} + "</from_env>");
return getRootNode(env_document.get());
};
process_include(attr_nodes["from_env"], get_env_node, "Env variable is not set: ");
}
if (included_something) if (included_something)
doIncludesRecursive(config, include_from, node, zk_node_cache, contributing_zk_paths); doIncludesRecursive(config, include_from, node, zk_node_cache, contributing_zk_paths);
else else

View File

@ -95,6 +95,8 @@ public:
/// Is the file named as result of config preprocessing, not as original files. /// Is the file named as result of config preprocessing, not as original files.
static bool isPreprocessedFile(const std::string & config_path); static bool isPreprocessedFile(const std::string & config_path);
static inline const auto SUBSTITUTION_ATTRS = {"incl", "from_zk", "from_env"};
private: private:
const std::string path; const std::string path;
const std::string preprocessed_path; const std::string preprocessed_path;

View File

@ -91,7 +91,7 @@ list(REMOVE_ITEM clickhouse_functions_headers IFunction.h FunctionFactory.h Func
add_library(clickhouse_functions ${clickhouse_functions_sources}) add_library(clickhouse_functions ${clickhouse_functions_sources})
target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES}) target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES} ${MURMURHASH2_LIBRARIES})
target_include_directories (clickhouse_functions SYSTEM BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR}) target_include_directories (clickhouse_functions SYSTEM BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR})

View File

@ -20,22 +20,7 @@ void registerFunctionsHashing(FunctionFactory & factory)
factory.registerFunction<FunctionIntHash32>(); factory.registerFunction<FunctionIntHash32>();
factory.registerFunction<FunctionIntHash64>(); factory.registerFunction<FunctionIntHash64>();
factory.registerFunction<FunctionURLHash>(); factory.registerFunction<FunctionURLHash>();
} factory.registerFunction<FunctionMurmurHash2>();
template <>
UInt64 toInteger<Float32>(Float32 x)
{
UInt32 res;
memcpy(&res, &x, sizeof(x));
return res;
}
template <>
UInt64 toInteger<Float64>(Float64 x)
{
UInt64 res;
memcpy(&res, &x, sizeof(x));
return res;
} }
} }

View File

@ -5,6 +5,7 @@
#include <city.h> #include <city.h>
#include <farmhash.h> #include <farmhash.h>
#include <metrohash.h> #include <metrohash.h>
#include <murmurhash2.h>
#include <Poco/ByteOrder.h> #include <Poco/ByteOrder.h>
@ -29,6 +30,7 @@
#include <Functions/FunctionHelpers.h> #include <Functions/FunctionHelpers.h>
#include <ext/range.h> #include <ext/range.h>
#include <ext/bit_cast.h>
namespace DB namespace DB
@ -143,6 +145,7 @@ struct SipHash64Impl
} }
}; };
struct SipHash128Impl struct SipHash128Impl
{ {
static constexpr auto name = "sipHash128"; static constexpr auto name = "sipHash128";
@ -213,10 +216,15 @@ public:
size_t size = offsets.size(); size_t size = offsets.size();
vec_to.resize(size); vec_to.resize(size);
ColumnString::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{
vec_to[i] = Impl::apply( vec_to[i] = Impl::apply(
reinterpret_cast<const char *>(&data[i == 0 ? 0 : offsets[i - 1]]), reinterpret_cast<const char *>(&data[current_offset]),
i == 0 ? offsets[i] - 1 : (offsets[i] - 1 - offsets[i - 1])); offsets[i] - current_offset - 1);
current_offset = offsets[i];
}
block.getByPosition(result).column = std::move(col_to); block.getByPosition(result).column = std::move(col_to);
} }
@ -265,12 +273,17 @@ public:
const auto size = offsets.size(); const auto size = offsets.size();
chars_to.resize(size * Impl::length); chars_to.resize(size * Impl::length);
ColumnString::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{
Impl::apply( Impl::apply(
reinterpret_cast<const char *>(&data[i == 0 ? 0 : offsets[i - 1]]), reinterpret_cast<const char *>(&data[current_offset]),
i == 0 ? offsets[i] - 1 : (offsets[i] - 1 - offsets[i - 1]), offsets[i] - current_offset - 1,
&chars_to[i * Impl::length]); &chars_to[i * Impl::length]);
current_offset = offsets[i];
}
block.getByPosition(result).column = std::move(col_to); block.getByPosition(result).column = std::move(col_to);
} }
else else
@ -354,19 +367,6 @@ public:
}; };
template <typename T>
static UInt64 toInteger(T x)
{
return x;
}
template <>
UInt64 toInteger<Float32>(Float32 x);
template <>
UInt64 toInteger<Float64>(Float64 x);
/** We use hash functions called CityHash, FarmHash, MetroHash. /** We use hash functions called CityHash, FarmHash, MetroHash.
* In this regard, this template is named with the words `NeighborhoodHash`. * In this regard, this template is named with the words `NeighborhoodHash`.
*/ */
@ -387,7 +387,7 @@ private:
size_t size = vec_from.size(); size_t size = vec_from.size();
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{ {
UInt64 h = IntHash64Impl::apply(toInteger(vec_from[i])); UInt64 h = IntHash64Impl::apply(ext::bit_cast<UInt64>(vec_from[i]));
if (first) if (first)
vec_to[i] = h; vec_to[i] = h;
else else
@ -396,7 +396,7 @@ private:
} }
else if (auto col_from = checkAndGetColumnConst<ColumnVector<FromType>>(column)) else if (auto col_from = checkAndGetColumnConst<ColumnVector<FromType>>(column))
{ {
const UInt64 hash = IntHash64Impl::apply(toInteger(col_from->template getValue<FromType>())); const UInt64 hash = IntHash64Impl::apply(ext::bit_cast<UInt64>(col_from->template getValue<FromType>()));
size_t size = vec_to.size(); size_t size = vec_to.size();
if (first) if (first)
{ {
@ -423,15 +423,19 @@ private:
const typename ColumnString::Offsets & offsets = col_from->getOffsets(); const typename ColumnString::Offsets & offsets = col_from->getOffsets();
size_t size = offsets.size(); size_t size = offsets.size();
ColumnString::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{ {
const UInt64 h = Impl::Hash64( const UInt64 h = Impl::Hash64(
reinterpret_cast<const char *>(&data[i == 0 ? 0 : offsets[i - 1]]), reinterpret_cast<const char *>(&data[current_offset]),
i == 0 ? offsets[i] - 1 : (offsets[i] - 1 - offsets[i - 1])); offsets[i] - current_offset - 1);
if (first) if (first)
vec_to[i] = h; vec_to[i] = h;
else else
vec_to[i] = Impl::Hash128to64(typename Impl::uint128_t(vec_to[i], h)); vec_to[i] = Impl::Hash128to64(typename Impl::uint128_t(vec_to[i], h));
current_offset = offsets[i];
} }
} }
else if (const ColumnFixedString * col_from = checkAndGetColumn<ColumnFixedString>(column)) else if (const ColumnFixedString * col_from = checkAndGetColumn<ColumnFixedString>(column))
@ -439,6 +443,7 @@ private:
const typename ColumnString::Chars_t & data = col_from->getChars(); const typename ColumnString::Chars_t & data = col_from->getChars();
size_t n = col_from->getN(); size_t n = col_from->getN();
size_t size = data.size() / n; size_t size = data.size() / n;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{ {
const UInt64 h = Impl::Hash64(reinterpret_cast<const char *>(&data[i * n]), n); const UInt64 h = Impl::Hash64(reinterpret_cast<const char *>(&data[i * n]), n);
@ -453,6 +458,7 @@ private:
String value = col_from->getValue<String>().data(); String value = col_from->getValue<String>().data();
const UInt64 hash = Impl::Hash64(value.data(), value.size()); const UInt64 hash = Impl::Hash64(value.data(), value.size());
const size_t size = vec_to.size(); const size_t size = vec_to.size();
if (first) if (first)
{ {
vec_to.assign(size, hash); vec_to.assign(size, hash);
@ -487,19 +493,21 @@ private:
const size_t size = offsets.size(); const size_t size = offsets.size();
ColumnArray::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{ {
const size_t begin = i == 0 ? 0 : offsets[i - 1]; ColumnArray::Offset next_offset = offsets[i];
const size_t end = offsets[i];
UInt64 h = IntHash64Impl::apply(end - begin); UInt64 h = IntHash64Impl::apply(next_offset - current_offset);
if (first) if (first)
vec_to[i] = h; vec_to[i] = h;
else else
vec_to[i] = Impl::Hash128to64(typename Impl::uint128_t(vec_to[i], h)); vec_to[i] = Impl::Hash128to64(typename Impl::uint128_t(vec_to[i], h));
for (size_t j = begin; j < end; ++j) for (size_t j = current_offset; j < next_offset; ++j)
vec_to[i] = Impl::Hash128to64(typename Impl::uint128_t(vec_to[i], vec_temp[j])); vec_to[i] = Impl::Hash128to64(typename Impl::uint128_t(vec_to[i], vec_temp[j]));
current_offset = offsets[i];
} }
} }
else if (const ColumnConst * col_from = checkAndGetColumnConst<ColumnArray>(column)) else if (const ColumnConst * col_from = checkAndGetColumnConst<ColumnArray>(column))
@ -614,6 +622,124 @@ public:
}; };
template <typename Impl>
class FunctionStringHash32 : public IFunction
{
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(const Context &) { return std::make_shared<FunctionStringHash32>(); }
String getName() const override { return name; }
bool isVariadic() const override { return false; }
size_t getNumberOfArguments() const override { return 1; }
DataTypePtr getReturnTypeImpl(const DataTypes & /* arguments */) const override { return std::make_shared<DataTypeUInt32>(); }
bool useDefaultImplementationForConstants() const override { return true; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
auto col_to = ColumnUInt32::create(input_rows_count);
ColumnUInt32::Container & vec_to = col_to->getData();
const ColumnWithTypeAndName & col = block.getByPosition(arguments[0]);
const IDataType * from_type = col.type.get();
const IColumn * icolumn = col.column.get();
if (checkDataType<DataTypeUInt8>(from_type)) executeIntType<UInt8>(icolumn, vec_to);
else if (checkDataType<DataTypeUInt16>(from_type)) executeIntType<UInt16>(icolumn, vec_to);
else if (checkDataType<DataTypeUInt32>(from_type)) executeIntType<UInt32>(icolumn, vec_to);
else if (checkDataType<DataTypeUInt64>(from_type)) executeIntType<UInt64>(icolumn, vec_to);
else if (checkDataType<DataTypeInt8>(from_type)) executeIntType<Int8>(icolumn, vec_to);
else if (checkDataType<DataTypeInt16>(from_type)) executeIntType<Int16>(icolumn, vec_to);
else if (checkDataType<DataTypeInt32>(from_type)) executeIntType<Int32>(icolumn, vec_to);
else if (checkDataType<DataTypeInt64>(from_type)) executeIntType<Int64>(icolumn, vec_to);
else if (checkDataType<DataTypeEnum8>(from_type)) executeIntType<Int8>(icolumn, vec_to);
else if (checkDataType<DataTypeEnum16>(from_type)) executeIntType<Int16>(icolumn, vec_to);
else if (checkDataType<DataTypeDate>(from_type)) executeIntType<UInt16>(icolumn, vec_to);
else if (checkDataType<DataTypeDateTime>(from_type)) executeIntType<UInt32>(icolumn, vec_to);
else if (checkDataType<DataTypeFloat32>(from_type)) executeIntType<Float32>(icolumn, vec_to);
else if (checkDataType<DataTypeFloat64>(from_type)) executeIntType<Float64>(icolumn, vec_to);
else if (checkDataType<DataTypeString>(from_type)) executeString(icolumn, vec_to);
else if (checkDataType<DataTypeFixedString>(from_type)) executeString(icolumn, vec_to);
else
throw Exception("Unexpected type " + from_type->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
block.getByPosition(result).column = std::move(col_to);
}
private:
template <typename FromType>
void executeIntType(const IColumn * column, ColumnUInt32::Container & vec_to)
{
if (const ColumnVector<FromType> * col_from = checkAndGetColumn<ColumnVector<FromType>>(column))
{
const typename ColumnVector<FromType>::Container & vec_from = col_from->getData();
size_t size = vec_from.size();
for (size_t i = 0; i < size; ++i)
{
vec_to[i] = Impl::Hash32(reinterpret_cast<const char *>(&vec_from[i]), sizeof(FromType));
}
}
else
throw Exception("Illegal column " + column->getName()
+ " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
void executeString(const IColumn * column, ColumnUInt32::Container & vec_to)
{
if (const ColumnString * col_from = checkAndGetColumn<ColumnString>(column))
{
const typename ColumnString::Chars_t & data = col_from->getChars();
const typename ColumnString::Offsets & offsets = col_from->getOffsets();
size_t size = offsets.size();
ColumnString::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
vec_to[i] = Impl::Hash32(
reinterpret_cast<const char *>(&data[current_offset]),
offsets[i] - current_offset - 1);
current_offset = offsets[i];
}
}
else if (const ColumnFixedString * col_from = checkAndGetColumn<ColumnFixedString>(column))
{
const typename ColumnString::Chars_t & data = col_from->getChars();
size_t n = col_from->getN();
size_t size = data.size() / n;
for (size_t i = 0; i < size; ++i)
vec_to[i] = Impl::Hash32(reinterpret_cast<const char *>(&data[i * n]), n);
}
else
throw Exception("Illegal column " + column->getName()
+ " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
};
/** Why we need MurmurHash2?
* MurmurHash2 is an outdated hash function, superseded by MurmurHash3 and subsequently by CityHash, xxHash, HighwayHash.
* Usually there is no reason to use MurmurHash.
* It is needed for the cases when you already have MurmurHash in some applications and you want to reproduce it
* in ClickHouse as is. For example, it is needed to reproduce the behaviour
* for NGINX a/b testing module: https://nginx.ru/en/docs/http/ngx_http_split_clients_module.html
*/
struct MurmurHash2Impl
{
static constexpr auto name = "murmurHash2_32";
static UInt32 Hash32(const char * data, const size_t size)
{
return MurmurHash2(data, size, 0);
}
};
struct URLHashImpl struct URLHashImpl
{ {
static UInt64 apply(const char * data, const size_t size) static UInt64 apply(const char * data, const size_t size)
@ -748,10 +874,15 @@ private:
const auto & offsets = col_from->getOffsets(); const auto & offsets = col_from->getOffsets();
auto & out = col_to->getData(); auto & out = col_to->getData();
for (const auto i : ext::range(0, size)) ColumnString::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
out[i] = URLHashImpl::apply( out[i] = URLHashImpl::apply(
reinterpret_cast<const char *>(&chars[i == 0 ? 0 : offsets[i - 1]]), reinterpret_cast<const char *>(&chars[current_offset]),
i == 0 ? offsets[i] - 1 : (offsets[i] - 1 - offsets[i - 1])); offsets[i] - current_offset - 1);
current_offset = offsets[i];
}
block.getByPosition(result).column = std::move(col_to); block.getByPosition(result).column = std::move(col_to);
} }
@ -778,10 +909,16 @@ private:
const auto & offsets = col_from->getOffsets(); const auto & offsets = col_from->getOffsets();
auto & out = col_to->getData(); auto & out = col_to->getData();
for (const auto i : ext::range(0, size)) ColumnString::Offset current_offset = 0;
out[i] = URLHierarchyHashImpl::apply(level, for (size_t i = 0; i < size; ++i)
reinterpret_cast<const char *>(&chars[i == 0 ? 0 : offsets[i - 1]]), {
i == 0 ? offsets[i] - 1 : (offsets[i] - 1 - offsets[i - 1])); out[i] = URLHierarchyHashImpl::apply(
level,
reinterpret_cast<const char *>(&chars[current_offset]),
offsets[i] - current_offset - 1);
current_offset = offsets[i];
}
block.getByPosition(result).column = std::move(col_to); block.getByPosition(result).column = std::move(col_to);
} }
@ -848,5 +985,5 @@ using FunctionSipHash128 = FunctionStringHashFixedString<SipHash128Impl>;
using FunctionCityHash64 = FunctionNeighbourhoodHash64<ImplCityHash64>; using FunctionCityHash64 = FunctionNeighbourhoodHash64<ImplCityHash64>;
using FunctionFarmHash64 = FunctionNeighbourhoodHash64<ImplFarmHash64>; using FunctionFarmHash64 = FunctionNeighbourhoodHash64<ImplFarmHash64>;
using FunctionMetroHash64 = FunctionNeighbourhoodHash64<ImplMetroHash64>; using FunctionMetroHash64 = FunctionNeighbourhoodHash64<ImplMetroHash64>;
using FunctionMurmurHash2 = FunctionStringHash32<MurmurHash2Impl>;
} }

View File

@ -19,9 +19,9 @@ void registerFunctionsNull(FunctionFactory & factory)
{ {
factory.registerFunction<FunctionIsNull>(); factory.registerFunction<FunctionIsNull>();
factory.registerFunction<FunctionIsNotNull>(); factory.registerFunction<FunctionIsNotNull>();
factory.registerFunction<FunctionCoalesce>(); factory.registerFunction<FunctionCoalesce>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionIfNull>(); factory.registerFunction<FunctionIfNull>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionNullIf>(); factory.registerFunction<FunctionNullIf>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionAssumeNotNull>(); factory.registerFunction<FunctionAssumeNotNull>();
factory.registerFunction<FunctionToNullable>(); factory.registerFunction<FunctionToNullable>();
} }

View File

@ -93,16 +93,16 @@ void SelectStreamFactory::createForShard(
if (shard_info.isLocal()) if (shard_info.isLocal())
{ {
StoragePtr main_table_storage; StoragePtr main_table_storage;
if (table_func_ptr) if (table_func_ptr)
{ {
auto table_function = static_cast<ASTFunction *>(table_func_ptr.get()); auto table_function = static_cast<const ASTFunction *>(table_func_ptr.get());
main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context); main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);
} }
else else
main_table_storage = context.tryGetTable(main_table.database, main_table.table); main_table_storage = context.tryGetTable(main_table.database, main_table.table);
if (!main_table_storage) /// Table is absent on a local server. if (!main_table_storage) /// Table is absent on a local server.
{ {
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable); ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);

View File

@ -19,7 +19,7 @@ public:
QueryProcessingStage::Enum processed_stage_, QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_, QualifiedTableName main_table_,
const Tables & external_tables); const Tables & external_tables);
/// TableFunction in a query. /// TableFunction in a query.
SelectStreamFactory( SelectStreamFactory(
const Block & header_, const Block & header_,

View File

@ -559,7 +559,7 @@ void ExpressionAnalyzer::analyzeAggregation()
const auto & col = block.getByName(column_name); const auto & col = block.getByName(column_name);
/// Constant expressions have non-null column pointer at this stage. /// Constant expressions have non-null column pointer at this stage.
if (const auto is_constexpr = col.column) if (col.column && col.column->isColumnConst())
{ {
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work. /// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
if (!aggregate_descriptions.empty() || size > 1) if (!aggregate_descriptions.empty() || size > 1)

View File

@ -57,13 +57,12 @@ ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context &
/// Branch with string in qery. /// Branch with string in qery.
if (typeid_cast<const ASTLiteral *>(node.get())) if (typeid_cast<const ASTLiteral *>(node.get()))
return node; return node;
/// Branch with TableFunction in query. /// Branch with TableFunction in query.
if (auto table_func_ptr = typeid_cast<ASTFunction *>(node.get())) if (auto table_func_ptr = typeid_cast<ASTFunction *>(node.get()))
if (TableFunctionFactory::instance().isTableFunctionName(table_func_ptr->name)) if (TableFunctionFactory::instance().isTableFunctionName(table_func_ptr->name))
return node; return node;
return std::make_shared<ASTLiteral>(evaluateConstantExpression(node, context).first); return std::make_shared<ASTLiteral>(evaluateConstantExpression(node, context).first);
} }

View File

@ -372,9 +372,9 @@ void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const
children.emplace_back(tables_list); children.emplace_back(tables_list);
table_expression = table_expr.get(); table_expression = table_expr.get();
} }
ASTPtr table = std::make_shared<ASTIdentifier>(table_name, ASTIdentifier::Table); ASTPtr table = std::make_shared<ASTIdentifier>(table_name, ASTIdentifier::Table);
if (!database_name.empty()) if (!database_name.empty())
{ {
ASTPtr database = std::make_shared<ASTIdentifier>(database_name, ASTIdentifier::Database); ASTPtr database = std::make_shared<ASTIdentifier>(database_name, ASTIdentifier::Database);
@ -405,7 +405,7 @@ void ASTSelectQuery::addTableFunction(ASTPtr & table_function_ptr)
children.emplace_back(tables_list); children.emplace_back(tables_list);
table_expression = table_expr.get(); table_expression = table_expr.get();
} }
table_expression->table_function = table_function_ptr; table_expression->table_function = table_function_ptr;
table_expression->database_and_table_name = nullptr; table_expression->database_and_table_name = nullptr;
} }

View File

@ -20,7 +20,6 @@ ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicated
, log(&Logger::get(log_name)) , log(&Logger::get(log_name))
{ {
task = storage_.context.getSchedulePool().createTask(log_name, [this]{ run(); }); task = storage_.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
} }
void ReplicatedMergeTreeAlterThread::run() void ReplicatedMergeTreeAlterThread::run()

View File

@ -23,6 +23,14 @@ class ReplicatedMergeTreeAlterThread
public: public:
ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_); ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_);
void start()
{
task->activate();
task->schedule();
}
void stop() { task->deactivate(); }
private: private:
void run(); void run();

View File

@ -108,7 +108,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
last_block_is_duplicate = false; last_block_is_duplicate = false;
/// TODO Is it possible to not lock the table structure here? /// TODO Is it possible to not lock the table structure here?
storage.data.delayInsertOrThrowIfNeeded(&storage.restarting_thread->getWakeupEvent()); storage.data.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
auto zookeeper = storage.getZooKeeper(); auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper); assertSessionIsNotExpired(zookeeper);

View File

@ -21,7 +21,6 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic
, log(&Logger::get(log_name)) , log(&Logger::get(log_name))
{ {
task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); }); task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
} }
void ReplicatedMergeTreeCleanupThread::run() void ReplicatedMergeTreeCleanupThread::run()

View File

@ -24,7 +24,15 @@ class ReplicatedMergeTreeCleanupThread
public: public:
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_); ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
void schedule() { task->schedule(); } void start()
{
task->activate();
task->schedule();
}
void wakeup() { task->schedule(); }
void stop() { task->deactivate(); }
private: private:
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;

View File

@ -57,7 +57,29 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread() ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread()
{ {
completeShutdown(); try
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger_mutator.actions_blocker.cancelForever();
/// Stop other tasks.
partialShutdown();
if (storage.queue_task_handle)
storage.context.getBackgroundPool().removeTask(storage.queue_task_handle);
storage.queue_task_handle.reset();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
} }
void ReplicatedMergeTreeRestartingThread::run() void ReplicatedMergeTreeRestartingThread::run()
@ -167,29 +189,6 @@ void ReplicatedMergeTreeRestartingThread::run()
task->scheduleAfter(check_period_ms); task->scheduleAfter(check_period_ms);
} }
void ReplicatedMergeTreeRestartingThread::completeShutdown()
{
try
{
storage.data_parts_exchange_endpoint_holder->getBlocker().cancelForever();
storage.data_parts_exchange_endpoint_holder = nullptr;
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger_mutator.actions_blocker.cancelForever();
partialShutdown();
if (storage.queue_task_handle)
storage.context.getBackgroundPool().removeTask(storage.queue_task_handle);
storage.queue_task_handle.reset();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
bool ReplicatedMergeTreeRestartingThread::tryStartup() bool ReplicatedMergeTreeRestartingThread::tryStartup()
{ {
@ -205,16 +204,16 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
/// Anything above can throw a KeeperException if something is wrong with ZK. /// Anything above can throw a KeeperException if something is wrong with ZK.
/// Anything below should not throw exceptions. /// Anything below should not throw exceptions.
storage.shutdown_called = false; storage.partial_shutdown_called = false;
storage.shutdown_event.reset(); storage.partial_shutdown_event.reset();
storage.queue_updating_task->activate(); storage.queue_updating_task->activate();
storage.queue_updating_task->schedule(); storage.queue_updating_task->schedule();
storage.mutations_updating_task->activate(); storage.mutations_updating_task->activate();
storage.mutations_updating_task->schedule(); storage.mutations_updating_task->schedule();
storage.cleanup_thread.start();
storage.alter_thread.start();
storage.part_check_thread.start(); storage.part_check_thread.start();
storage.alter_thread = std::make_unique<ReplicatedMergeTreeAlterThread>(storage);
storage.cleanup_thread = std::make_unique<ReplicatedMergeTreeCleanupThread>(storage);
if (!storage.queue_task_handle) if (!storage.queue_task_handle)
storage.queue_task_handle = storage.context.getBackgroundPool().addTask( storage.queue_task_handle = storage.context.getBackgroundPool().addTask(
@ -350,8 +349,8 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
{ {
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown); ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
storage.shutdown_called = true; storage.partial_shutdown_called = true;
storage.shutdown_event.set(); storage.partial_shutdown_event.set();
storage.alter_query_event->set(); storage.alter_query_event->set();
storage.replica_is_active_node = nullptr; storage.replica_is_active_node = nullptr;
@ -362,8 +361,8 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.queue_updating_task->deactivate(); storage.queue_updating_task->deactivate();
storage.mutations_updating_task->deactivate(); storage.mutations_updating_task->deactivate();
storage.cleanup_thread.reset(); storage.cleanup_thread.stop();
storage.alter_thread.reset(); storage.alter_thread.stop();
storage.part_check_thread.stop(); storage.part_check_thread.stop();
LOG_TRACE(log, "Threads finished"); LOG_TRACE(log, "Threads finished");

View File

@ -25,28 +25,12 @@ public:
ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_); ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeRestartingThread(); ~ReplicatedMergeTreeRestartingThread();
void wakeup() void wakeup() { task->schedule(); }
{
wakeup_event.set();
task->schedule();
}
Poco::Event & getWakeupEvent()
{
return wakeup_event;
}
void stop()
{
need_stop = true;
wakeup_event.set();
}
private: private:
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;
String log_name; String log_name;
Logger * log; Logger * log;
Poco::Event wakeup_event;
std::atomic<bool> need_stop {false}; std::atomic<bool> need_stop {false};
/// The random data we wrote into `/replicas/me/is_active`. /// The random data we wrote into `/replicas/me/is_active`.
@ -59,7 +43,6 @@ private:
bool startup_completed = false; bool startup_completed = false;
void run(); void run();
void completeShutdown();
/// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper. /// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper.
bool tryStartup(); /// Returns false if ZooKeeper is not available. bool tryStartup(); /// Returns false if ZooKeeper is not available.

View File

@ -244,19 +244,19 @@ BlockInputStreams StorageDistributed::read(
processed_stage = result_size == 1 processed_stage = result_size == 1
? QueryProcessingStage::Complete ? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState; : QueryProcessingStage::WithMergeableState;
const auto & modified_query_ast = rewriteSelectQuery( const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr); query_info.query, remote_database, remote_table, remote_table_function_ptr);
Block header = materializeBlock(InterpreterSelectQuery(query_info.query, context, Names{}, processed_stage).getSampleBlock()); Block header = materializeBlock(InterpreterSelectQuery(query_info.query, context, Names{}, processed_stage).getSampleBlock());
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr ? ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr ?
ClusterProxy::SelectStreamFactory( ClusterProxy::SelectStreamFactory(
header, processed_stage, remote_table_function_ptr, context.getExternalTables()) header, processed_stage, remote_table_function_ptr, context.getExternalTables())
: ClusterProxy::SelectStreamFactory( : ClusterProxy::SelectStreamFactory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables()); header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
return ClusterProxy::executeQuery( return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings); select_stream_factory, cluster, modified_query_ast, context, settings);
} }

View File

@ -41,7 +41,7 @@ public:
const String & remote_table_, /// The name of the table on the remote servers. const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_, ClusterPtr owned_cluster_,
const Context & context_); const Context & context_);
static StoragePtr createWithOwnCluster( static StoragePtr createWithOwnCluster(
const std::string & table_name_, const std::string & table_name_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
@ -155,7 +155,7 @@ protected:
const ASTPtr & sharding_key_, const ASTPtr & sharding_key_,
const String & data_path_, const String & data_path_,
bool attach); bool attach);
StorageDistributed( StorageDistributed(
const String & database_name, const String & database_name,
const String & table_name_, const String & table_name_,

View File

@ -128,8 +128,9 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
public: public:
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_, char row_delimiter_) ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_, char row_delimiter_)
: ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr), : ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr),
current_pending(false), log(log_), read_messages(0), row_delimiter(row_delimiter_) { current_pending(false), log(log_), read_messages(0), row_delimiter(row_delimiter_)
LOG_TRACE(log, "row delimiter is :" << row_delimiter); {
LOG_TRACE(log, "Row delimiter is: " << row_delimiter);
} }
~ReadBufferFromKafkaConsumer() { reset(); } ~ReadBufferFromKafkaConsumer() { reset(); }

View File

@ -215,7 +215,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
[this] (const std::string & name) { enqueuePartForCheck(name); }), [this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this), reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this),
fetcher(data), fetcher(data),
shutdown_event(false), part_check_thread(*this), cleanup_thread(*this), alter_thread(*this), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")) log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{ {
if (path_.empty()) if (path_.empty())
@ -1653,7 +1653,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
/// We want to remove dropped parts from disk as soon as possible /// We want to remove dropped parts from disk as soon as possible
/// To be removed a partition should have zero refcount, therefore call the cleanup thread at exit /// To be removed a partition should have zero refcount, therefore call the cleanup thread at exit
parts_to_remove.clear(); parts_to_remove.clear();
cleanup_thread->schedule(); cleanup_thread.wakeup();
} }
@ -2034,7 +2034,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove); tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
res_parts.clear(); res_parts.clear();
parts_to_remove.clear(); parts_to_remove.clear();
cleanup_thread->schedule(); cleanup_thread.wakeup();
return true; return true;
} }
@ -2042,10 +2042,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
void StorageReplicatedMergeTree::queueUpdatingTask() void StorageReplicatedMergeTree::queueUpdatingTask()
{ {
//most probably this check is not relevant
if (shutdown_called)
return;
if (!queue_update_in_progress) if (!queue_update_in_progress)
{ {
last_queue_update_start_time.store(time(nullptr)); last_queue_update_start_time.store(time(nullptr));
@ -2668,7 +2664,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
{ {
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch"); LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
cleanup_thread->schedule(); cleanup_thread.wakeup();
return false; return false;
} }
@ -2791,11 +2787,7 @@ void StorageReplicatedMergeTree::startup()
void StorageReplicatedMergeTree::shutdown() void StorageReplicatedMergeTree::shutdown()
{ {
if (restarting_thread) restarting_thread.reset();
{
restarting_thread->stop();
restarting_thread.reset();
}
if (data_parts_exchange_endpoint_holder) if (data_parts_exchange_endpoint_holder)
{ {
@ -3030,7 +3022,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
{ {
LOG_DEBUG(log, "Waiting for " << replica << " to apply changes"); LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
while (!shutdown_called) while (!partial_shutdown_called)
{ {
/// Replica could be inactive. /// Replica could be inactive.
if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
@ -3095,7 +3087,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
} }
} }
if (shutdown_called) if (partial_shutdown_called)
throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.", throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.",
ErrorCodes::UNFINISHED); ErrorCodes::UNFINISHED);
@ -4541,7 +4533,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
/// Speedup removing of replaced parts from filesystem /// Speedup removing of replaced parts from filesystem
parts_to_remove.clear(); parts_to_remove.clear();
cleanup_thread->schedule(); cleanup_thread.wakeup();
/// If necessary, wait until the operation is performed on all replicas. /// If necessary, wait until the operation is performed on all replicas.
if (context.getSettingsRef().replication_alter_partitions_sync > 1) if (context.getSettingsRef().replication_alter_partitions_sync > 1)
@ -4644,7 +4636,7 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
if (cond_reached) if (cond_reached)
break; break;
if (shutdown_called) if (partial_shutdown_called)
throw Exception("Shutdown is called for table", ErrorCodes::ABORTED); throw Exception("Shutdown is called for table", ErrorCodes::ABORTED);
} }

View File

@ -264,8 +264,9 @@ private:
Poco::Event startup_event; Poco::Event startup_event;
/// Do I need to complete background threads (except restarting_thread)? /// Do I need to complete background threads (except restarting_thread)?
std::atomic<bool> shutdown_called {false}; std::atomic<bool> partial_shutdown_called {false};
Poco::Event shutdown_event; /// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires.
Poco::Event partial_shutdown_event {Poco::Event::EVENT_MANUALRESET};
/// Limiting parallel fetches per one table /// Limiting parallel fetches per one table
std::atomic_uint current_table_fetches {0}; std::atomic_uint current_table_fetches {0};
@ -283,25 +284,24 @@ private:
/// A task that selects parts to merge. /// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task; BackgroundSchedulePool::TaskHolder merge_selecting_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
/// A task that marks finished mutations as done. /// A task that marks finished mutations as done.
BackgroundSchedulePool::TaskHolder mutations_finalizing_task; BackgroundSchedulePool::TaskHolder mutations_finalizing_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
/// A thread that removes old parts, log entries, and blocks. /// A thread that removes old parts, log entries, and blocks.
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread; ReplicatedMergeTreeCleanupThread cleanup_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
/// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes. /// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes.
std::unique_ptr<ReplicatedMergeTreeAlterThread> alter_thread; ReplicatedMergeTreeAlterThread alter_thread;
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked. /// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
ReplicatedMergeTreePartCheckThread part_check_thread; ReplicatedMergeTreePartCheckThread part_check_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
/// An event that awakens `alter` method from waiting for the completion of the ALTER query. /// An event that awakens `alter` method from waiting for the completion of the ALTER query.
zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>(); zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>();

View File

@ -29,14 +29,14 @@ ColumnsDescription getStructureOfRemoteTable(
{ {
/// Send to the first any remote shard. /// Send to the first any remote shard.
const auto & shard_info = cluster.getAnyShardInfo(); const auto & shard_info = cluster.getAnyShardInfo();
String query; String query;
if (table_func_ptr) if (table_func_ptr)
{ {
if (shard_info.isLocal()) if (shard_info.isLocal())
{ {
auto table_function = static_cast<ASTFunction *>(table_func_ptr.get()); auto table_function = static_cast<const ASTFunction *>(table_func_ptr.get());
return TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context)->getColumns(); return TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context)->getColumns();
} }

View File

@ -42,7 +42,7 @@ public:
TableFunctionPtr get( TableFunctionPtr get(
const std::string & name, const std::string & name,
const Context & context) const; const Context & context) const;
bool isTableFunctionName(const std::string & name) const; bool isTableFunctionName(const std::string & name) const;
const TableFunctions & getAllTableFunctions() const const TableFunctions & getAllTableFunctions() const

View File

@ -231,17 +231,18 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
++arg_num; ++arg_num;
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context); args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
const auto table_function = static_cast<ASTFunction *>(args[arg_num].get()); const auto function = typeid_cast<const ASTFunction *>(args[arg_num].get());
if (TableFunctionFactory::instance().isTableFunctionName(table_function->name)) if (function && TableFunctionFactory::instance().isTableFunctionName(function->name))
{ {
remote_table_function_ptr = args[arg_num]; remote_table_function_ptr = args[arg_num];
++arg_num; ++arg_num;
} }
else { else
{
remote_database = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>(); remote_database = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>();
++arg_num; ++arg_num;
size_t dot = remote_database.find('.'); size_t dot = remote_database.find('.');
@ -254,12 +255,13 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
else else
{ {
if (arg_num >= args.size()) if (arg_num >= args.size())
{
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
else else
{ {
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context); args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
remote_table = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>(); remote_table = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>();
remote_database = remote_database;
++arg_num; ++arg_num;
} }
} }
@ -315,15 +317,14 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
} }
auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_database, remote_table, context, remote_table_function_ptr); auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_database, remote_table, context, remote_table_function_ptr);
StoragePtr res = remote_table_function_ptr ? StoragePtr res = remote_table_function_ptr
StorageDistributed::createWithOwnCluster( ? StorageDistributed::createWithOwnCluster(
getName(), getName(),
structure_remote_table, structure_remote_table,
remote_table_function_ptr, remote_table_function_ptr,
cluster, cluster,
context) context)
: StorageDistributed::createWithOwnCluster( : StorageDistributed::createWithOwnCluster(
getName(), getName(),
structure_remote_table, structure_remote_table,
@ -336,6 +337,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
return res; return res;
} }
TableFunctionRemote::TableFunctionRemote(const std::string & name_) TableFunctionRemote::TableFunctionRemote(const std::string & name_)
: name(name_) : name(name_)
{ {

View File

@ -195,7 +195,7 @@ def main(args):
stderr_file = os.path.join(suite_tmp_dir, name) + '.stderr' stderr_file = os.path.join(suite_tmp_dir, name) + '.stderr'
if ext == '.sql': if ext == '.sql':
command = "{0} --multiquery < {1} > {2} 2> {3}".format(args.client, case_file, stdout_file, stderr_file) command = "{0} --testmode --multiquery < {1} > {2} 2> {3}".format(args.client, case_file, stdout_file, stderr_file)
else: else:
command = "{0} > {1} 2> {2}".format(case_file, stdout_file, stderr_file) command = "{0} > {1} 2> {2}".format(case_file, stdout_file, stderr_file)

View File

@ -19,7 +19,7 @@ class Client:
command = self.command[:] command = self.command[:]
if stdin is None: if stdin is None:
command += ['--multiquery'] command += ['--multiquery', '--testmode']
stdin = sql stdin = sql
else: else:
command += ['--query', sql] command += ['--query', sql]

View File

@ -20,8 +20,16 @@ from .client import Client, CommandRequest
HELPERS_DIR = p.dirname(__file__) HELPERS_DIR = p.dirname(__file__)
DEFAULT_ENV_NAME = 'env_file'
def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME):
full_path = os.path.join(path, fname)
with open(full_path, 'w') as f:
for var, value in variables.items():
f.write("=".join([var, value]) + "\n")
return full_path
class ClickHouseCluster: class ClickHouseCluster:
"""ClickHouse cluster with several instances and (possibly) ZooKeeper. """ClickHouse cluster with several instances and (possibly) ZooKeeper.
@ -55,12 +63,12 @@ class ClickHouseCluster:
self.with_zookeeper = False self.with_zookeeper = False
self.with_mysql = False self.with_mysql = False
self.with_kafka = False self.with_kafka = False
self.docker_client = None self.docker_client = None
self.is_up = False self.is_up = False
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, hostname=None): def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, hostname=None, env_variables={}):
"""Add an instance to the cluster. """Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse. name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
@ -78,7 +86,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance( instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper, self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper,
self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname) self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname, env_variables=env_variables)
self.instances[name] = instance self.instances[name] = instance
self.base_cmd.extend(['--file', instance.docker_compose_path]) self.base_cmd.extend(['--file', instance.docker_compose_path])
@ -87,7 +95,7 @@ class ClickHouseCluster:
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]) self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')] self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
if with_mysql and not self.with_mysql: if with_mysql and not self.with_mysql:
self.with_mysql = True self.with_mysql = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]) self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
@ -219,13 +227,15 @@ services:
- --log-file=/var/log/clickhouse-server/clickhouse-server.log - --log-file=/var/log/clickhouse-server/clickhouse-server.log
- --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log - --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log
depends_on: {depends_on} depends_on: {depends_on}
env_file:
- {env_file}
''' '''
class ClickHouseInstance: class ClickHouseInstance:
def __init__( def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros, self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None): with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None, env_variables={}):
self.name = name self.name = name
self.base_cmd = cluster.base_cmd[:] self.base_cmd = cluster.base_cmd[:]
@ -249,6 +259,7 @@ class ClickHouseInstance:
self.path = p.join(self.cluster.instances_dir, name) self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml') self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
self.env_variables = env_variables
self.docker_client = None self.docker_client = None
self.ip_address = None self.ip_address = None
@ -396,6 +407,8 @@ class ClickHouseInstance:
depends_on.append("zoo2") depends_on.append("zoo2")
depends_on.append("zoo3") depends_on.append("zoo3")
env_file = _create_env_file(os.path.dirname(self.docker_compose_path), self.env_variables)
with open(self.docker_compose_path, 'w') as docker_compose: with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format( docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
name=self.name, name=self.name,
@ -406,7 +419,8 @@ class ClickHouseInstance:
config_d_dir=config_d_dir, config_d_dir=config_d_dir,
db_dir=db_dir, db_dir=db_dir,
logs_dir=logs_dir, logs_dir=logs_dir,
depends_on=str(depends_on))) depends_on=str(depends_on),
env_file=env_file))
def destroy_dir(self): def destroy_dir(self):

View File

@ -0,0 +1,14 @@
<yandex>
<profiles>
<default>
<max_query_size from_env="MAX_QUERY_SIZE" />
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
</yandex>

View File

@ -0,0 +1,15 @@
<yandex>
<include_from>/etc/clickhouse-server/config.d/max_query_size.xml</include_from>
<profiles>
<default>
<max_query_size incl="mqs" />
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
</yandex>

View File

@ -0,0 +1,14 @@
<yandex>
<profiles>
<default>
<max_query_size>33333</max_query_size>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
</yandex>

View File

@ -0,0 +1,14 @@
<yandex>
<profiles>
<default>
<max_query_size from_zk="/setting/max_query_size" />
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
</yandex>

View File

@ -0,0 +1,3 @@
<yandex>
<mqs>99999</mqs>
</yandex>

View File

@ -0,0 +1,28 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/config_no_substs.xml']) # hardcoded value 33333
node2 = cluster.add_instance('node2', main_configs=['configs/config_env.xml'], env_variables={"MAX_QUERY_SIZE": "55555"})
node3 = cluster.add_instance('node3', main_configs=['configs/config_zk.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', main_configs=['configs/config_incl.xml', 'configs/max_query_size.xml']) # include value 77777
@pytest.fixture(scope="module")
def start_cluster():
try:
def create_zk_roots(zk):
zk.create(path="/setting/max_query_size", value="77777", makepath=True)
cluster.add_zookeeper_startup_command(create_zk_roots)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_config(start_cluster):
assert node1.query("select value from system.settings where name = 'max_query_size'") == "33333\n"
assert node2.query("select value from system.settings where name = 'max_query_size'") == "55555\n"
assert node3.query("select value from system.settings where name = 'max_query_size'") == "77777\n"
assert node4.query("select value from system.settings where name = 'max_query_size'") == "99999\n"

View File

@ -14,7 +14,7 @@ def _fill_nodes(nodes, shard):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}', date, id, 8192); ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}', date, id, 8192);
'''.format(shard=shard, replica=node.name)) '''.format(shard=shard, replica=node.name))
cluster = ClickHouseCluster(__file__, server_bin_path="/home/alesap/ClickHouse/dbms/programs/clickhouse-server") cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True) node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True) node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)

View File

@ -1 +1,12 @@
SELECT * FROM system.numbers LIMIT 10 SELECT * FROM system.numbers LIMIT 3;
SELECT sys_num.number FROM system.numbers AS sys_num WHERE number > 2 LIMIT 2;
SELECT number FROM system.numbers WHERE number >= 5 LIMIT 2;
SELECT * FROM system.numbers WHERE number == 7 LIMIT 1;
SELECT number AS n FROM system.numbers WHERE number IN(8, 9) LIMIT 2;
select number from system.numbers limit 0;
select x from system.numbers limit 1; -- { clientError 0 serverError 47 }
SELECT x, number FROM system.numbers LIMIT 1; -- { serverError 47 }
SELECT * FROM system.number LIMIT 1; -- { serverError 60 }
SELECT * FROM system LIMIT 1; -- { serverError 60 }
SELECT * FROM numbers LIMIT 1; -- { serverError 60 }
SELECT sys.number FROM system.numbers AS sys_num LIMIT 1; -- { serverError 47 }

View File

@ -1 +1,3 @@
['Hello','Goodbye'] ['Hello','Goodbye']
['Hello'] ['Goodbye']
[]

View File

@ -1 +1,3 @@
SELECT ['Hello', 'Goodbye'] SELECT ['Hello', 'Goodbye'];
SELECT ['Hello'], ['Goodbye'];
SELECT [];

View File

@ -28,3 +28,8 @@ x Nullable(String)
1 UInt8 1 UInt8
1 UInt8 1 UInt8
\N Nullable(Nothing) \N Nullable(Nothing)
0 Nullable(String)
-1 Nullable(String)
2 Nullable(String)
3 Nullable(String)
4 Nullable(String)

View File

@ -17,3 +17,5 @@ SELECT ifNull(nullIf(toString(number), '1'), nullIf(toString(-number), '-3')) AS
SELECT ifNull(NULL, 1) AS res, toTypeName(res); SELECT ifNull(NULL, 1) AS res, toTypeName(res);
SELECT ifNull(1, NULL) AS res, toTypeName(res); SELECT ifNull(1, NULL) AS res, toTypeName(res);
SELECT ifNull(NULL, NULL) AS res, toTypeName(res); SELECT ifNull(NULL, NULL) AS res, toTypeName(res);
SELECT IFNULL(NULLIF(toString(number), '1'), NULLIF(toString(-number), '-3')) AS res, toTypeName(res) FROM system.numbers LIMIT 5;

View File

@ -1,4 +1,5 @@
\N \N \N 1 1 1 1 \N \N \N 1 1 1 1
\N \N 1
0 Nullable(UInt64) 0 Nullable(UInt64)
\N Nullable(UInt64) \N Nullable(UInt64)
2 Nullable(UInt64) 2 Nullable(UInt64)

View File

@ -1,6 +1,8 @@
SELECT coalesce(), coalesce(NULL), coalesce(NULL, NULL), SELECT coalesce(), coalesce(NULL), coalesce(NULL, NULL),
coalesce(1), coalesce(1, NULL), coalesce(NULL, 1), coalesce(NULL, 1, NULL); coalesce(1), coalesce(1, NULL), coalesce(NULL, 1), coalesce(NULL, 1, NULL);
SELECT COALESCE(), COALESCE(NULL), COALESCE(1, NULL);
SELECT coalesce(number % 2 = 0 ? number : NULL, number % 3 = 0 ? number : NULL, number % 5 = 0 ? number : NULL) AS res, toTypeName(res) FROM system.numbers LIMIT 15; SELECT coalesce(number % 2 = 0 ? number : NULL, number % 3 = 0 ? number : NULL, number % 5 = 0 ? number : NULL) AS res, toTypeName(res) FROM system.numbers LIMIT 15;
SELECT coalesce(number % 2 = 0 ? number : NULL, number % 3 = 0 ? number : NULL, number) AS res, toTypeName(res) FROM system.numbers LIMIT 15; SELECT coalesce(number % 2 = 0 ? number : NULL, number % 3 = 0 ? number : NULL, number) AS res, toTypeName(res) FROM system.numbers LIMIT 15;
SELECT coalesce(number % 2 = 0 ? number : NULL, number % 3 = 0 ? number : NULL, 100) AS res, toTypeName(res) FROM system.numbers LIMIT 15; SELECT coalesce(number % 2 = 0 ? number : NULL, number % 3 = 0 ? number : NULL, 100) AS res, toTypeName(res) FROM system.numbers LIMIT 15;

View File

@ -0,0 +1,7 @@
1 1
0 1
1 1
0 1
1 1
0 8
1 2

View File

@ -0,0 +1,12 @@
SELECT dummy IN (0) AS x, count() GROUP BY x;
SELECT 1 IN (0) AS x, count() GROUP BY x;
SELECT 0 IN (0) AS x, count() GROUP BY x;
SELECT materialize(1) IN (0) AS x, count() GROUP BY x;
SELECT materialize(0) IN (0) AS x, count() GROUP BY x;
SELECT
number IN (1, 2) AS x,
count()
FROM numbers(10)
GROUP BY x;

View File

@ -0,0 +1 @@
4 ['hello','world'] hello

View File

@ -0,0 +1 @@
WITH arrayJoin(['hello', 'world']) AS s SELECT count(), arraySort(groupUniqArray(s)), anyHeavy(s) FROM remote('127.0.0.{2,3}', system.one);

View File

@ -0,0 +1,15 @@
623211862
3533626746
2388617433
2708309598
2414502773
670491991
0
0
0
0
0
0
0
1
1

View File

@ -0,0 +1,17 @@
SELECT murmurHash2_32(123456);
SELECT murmurHash2_32(CAST(3 AS UInt8));
SELECT murmurHash2_32(CAST(1.2684 AS Float32));
SELECT murmurHash2_32(CAST(-154477 AS Int64));
SELECT murmurHash2_32('foo');
SELECT murmurHash2_32(CAST('bar' AS FixedString(3)));
SELECT murmurHash2_32(x) FROM (SELECT CAST(1 AS Enum8('a' = 1, 'b' = 2)) as x);
SELECT murmurHash2_32('');
SELECT murmurHash2_32('\x01');
SELECT murmurHash2_32('\x02\0');
SELECT murmurHash2_32('\x03\0\0');
SELECT murmurHash2_32(1);
SELECT murmurHash2_32(toUInt16(2));
SELECT murmurHash2_32(2) = bitXor(toUInt32(0x5bd1e995 * bitXor(toUInt32(3 * 0x5bd1e995) AS a, bitShiftRight(a, 13))) AS b, bitShiftRight(b, 15));
SELECT murmurHash2_32('\x02') = bitXor(toUInt32(0x5bd1e995 * bitXor(toUInt32(3 * 0x5bd1e995) AS a, bitShiftRight(a, 13))) AS b, bitShiftRight(b, 15));

View File

@ -0,0 +1,3 @@
1 10000
1 10000
1 10000

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS test.remote_test;
CREATE TABLE test.remote_test(uid String, its UInt32, action_code String, day Date) ENGINE = MergeTree(day, (uid, its), 8192);
INSERT INTO test.remote_test SELECT toString(number) AS uid, number % 3 AS its, toString(number % 3) AS action_code, '2000-01-01' FROM system.numbers LIMIT 10000;
SELECT level, COUNT() FROM (SELECT uid, windowFunnel(3600)(toUInt32(its), action_code != '', action_code = '2') AS level FROM remote('127.0.0.{2,3}', test.remote_test) GROUP BY uid) GROUP BY level;
SELECT level, COUNT() FROM (SELECT uid, windowFunnel(3600)(toUInt32(its), action_code != '', action_code = '2') AS level FROM remote('127.0.0.{2,3}', test.remote_test) GROUP BY uid) GROUP BY level;
SELECT level, COUNT() FROM (SELECT uid, windowFunnel(3600)(toUInt32(its), action_code != '', action_code = '2') AS level FROM remote('127.0.0.{2,3}', test.remote_test) GROUP BY uid) GROUP BY level;
DROP TABLE IF EXISTS test.remote_test;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (18.2.0) unstable; urgency=low clickhouse (18.4.0) unstable; urgency=low
* Modified source code * Modified source code
-- <robot-metrika-test@yandex-team.ru> Mon, 23 Jul 2018 22:38:09 +0300 -- <robot-metrika-test@yandex-team.ru> Sat, 28 Jul 2018 00:35:05 +0300

View File

@ -32,8 +32,6 @@ See `dbms/tests/integration/README.md` on how to run these tests.
Note that integration of ClickHouse with third-party drivers is not tested. Also we currently don't have integration tests with our JDBC and ODBC drivers. Note that integration of ClickHouse with third-party drivers is not tested. Also we currently don't have integration tests with our JDBC and ODBC drivers.
We don't have integration tests for `Kafka` table engine that is developed by community - this is one of the most anticipated tests (otherwise there is almost no way to be confident with `Kafka` tables).
## Unit Tests ## Unit Tests

12
docs/en/faq/general.md Normal file
View File

@ -0,0 +1,12 @@
# General questions
## Why not use something like MapReduce?
We can refer to systems like MapReduce as distributed computing systems in which the reduce operation is based on a distributed sort. The most common opensource solution of this kind is [Apache Hadoop](http://hadoop.apache.org), while Yandex internally uses it's own MapReduce implementation — YT.
The systems of this kind are not suitable for online queries due to their high latency. In other words, they can't be used as the back-end for a web interface.
Distributed sorting isn't the best way to perform reduce operations if the result of the operation and all the intermediate results (if there are any) are located in the RAM of a single server, which is usually the case for online queries. In such a case, a hash table is the optimal way to perform reduce operations. A common approach to optimizing MapReduce tasks is pre-aggregation (partial reduce) using a hash table in RAM. The user performs this optimization manually.
Distributed sorting is one of the main causes of reduced performance when running simple MapReduce tasks.
Most MapReduce implementations allow executing any code on the cluster. But a declarative query language is better suited to OLAP in order to run experiments quickly. For example, Hadoop has Hive and Pig. Also consider Cloudera Impala, Shark (outdated) for Spark, and Spark SQL, Presto, and Apache Drill. Performance when running such tasks is highly sub-optimal compared to specialized systems, but relatively high latency makes it unrealistic to use these systems as the backend for a web interface.

View File

@ -360,9 +360,8 @@ We ran queries using a client located in a Yandex datacenter in Finland on a clu
## Summary ## Summary
```text | nodes | Q1 | Q2 | Q3 | Q4 |
nodes Q1 Q2 Q3 Q4 | ----- | ----- | ----- | ----- | ----- |
1 0.490 1.224 2.104 3.593 | 1 | 0.490 | 1.224 | 2.104 | 3.593 |
3 0.212 0.438 0.733 1.241 | 3 | 0.212 | 0.438 | 0.733 | 1.241 |
140 0.028 0.043 0.051 0.072 | 140 | 0.028 | 0.043 | 0.051 | 0.072 |
```

View File

@ -2,15 +2,6 @@
# OnTime # OnTime
This performance test was created by Vadim Tkachenko. See:
- <https://www.percona.com/blog/2009/10/02/analyzing-air-traffic-performance-with-infobright-and-monetdb/>
- <https://www.percona.com/blog/2009/10/26/air-traffic-queries-in-luciddb/>
- <https://www.percona.com/blog/2009/11/02/air-traffic-queries-in-infinidb-early-alpha/>
- <https://www.percona.com/blog/2014/04/21/using-apache-hadoop-and-impala-together-with-mysql-for-data-analysis/>
- <https://www.percona.com/blog/2016/01/07/apache-spark-with-air-ontime-performance-data/>
- <http://nickmakos.blogspot.ru/2012/08/analyzing-air-traffic-performance-with.html>
Downloading data: Downloading data:
```bash ```bash
@ -316,3 +307,12 @@ SELECT OriginCityName, DestCityName, count() AS c FROM ontime GROUP BY OriginCit
SELECT OriginCityName, count() AS c FROM ontime GROUP BY OriginCityName ORDER BY c DESC LIMIT 10; SELECT OriginCityName, count() AS c FROM ontime GROUP BY OriginCityName ORDER BY c DESC LIMIT 10;
``` ```
This performance test was created by Vadim Tkachenko. For mode details see:
- <https://www.percona.com/blog/2009/10/02/analyzing-air-traffic-performance-with-infobright-and-monetdb/>
- <https://www.percona.com/blog/2009/10/26/air-traffic-queries-in-luciddb/>
- <https://www.percona.com/blog/2009/11/02/air-traffic-queries-in-infinidb-early-alpha/>
- <https://www.percona.com/blog/2014/04/21/using-apache-hadoop-and-impala-together-with-mysql-for-data-analysis/>
- <https://www.percona.com/blog/2016/01/07/apache-spark-with-air-ontime-performance-data/>
- <http://nickmakos.blogspot.ru/2012/08/analyzing-air-traffic-performance-with.html>

View File

@ -64,7 +64,11 @@ Comma Separated Values format ([RFC](https://tools.ietf.org/html/rfc4180)).
When formatting, rows are enclosed in double quotes. A double quote inside a string is output as two double quotes in a row. There are no other rules for escaping characters. Date and date-time are enclosed in double quotes. Numbers are output without quotes. Values are separated by a delimiter&ast;. Rows are separated using the Unix line feed (LF). Arrays are serialized in CSV as follows: first the array is serialized to a string as in TabSeparated format, and then the resulting string is output to CSV in double quotes. Tuples in CSV format are serialized as separate columns (that is, their nesting in the tuple is lost). When formatting, rows are enclosed in double quotes. A double quote inside a string is output as two double quotes in a row. There are no other rules for escaping characters. Date and date-time are enclosed in double quotes. Numbers are output without quotes. Values are separated by a delimiter&ast;. Rows are separated using the Unix line feed (LF). Arrays are serialized in CSV as follows: first the array is serialized to a string as in TabSeparated format, and then the resulting string is output to CSV in double quotes. Tuples in CSV format are serialized as separate columns (that is, their nesting in the tuple is lost).
&ast;By default — `,`. See a [format_csv_delimiter](/docs/en/operations/settings/settings/#format_csv_delimiter) setting for additional info. ```
clickhouse-client --format_csv_delimiter="|" --query="INSERT INTO test.csv FORMAT CSV" < data.csv
```
&ast;By default — `,`. See a [format_csv_delimiter](/operations/settings/settings/#format_csv_delimiter) setting for additional info.
When parsing, all values can be parsed either with or without quotes. Both double and single quotes are supported. Rows can also be arranged without quotes. In this case, they are parsed up to a delimiter or line feed (CR or LF). In violation of the RFC, when parsing rows without quotes, the leading and trailing spaces and tabs are ignored. For the line feed, Unix (LF), Windows (CR LF) and Mac OS Classic (CR LF) are all supported. When parsing, all values can be parsed either with or without quotes. Both double and single quotes are supported. Rows can also be arranged without quotes. In this case, they are parsed up to a delimiter or line feed (CR or LF). In violation of the RFC, when parsing rows without quotes, the leading and trailing spaces and tabs are ignored. For the line feed, Unix (LF), Windows (CR LF) and Mac OS Classic (CR LF) are all supported.

View File

@ -1,6 +1,6 @@
# ClickHouse features that can be considered disadvantages # ClickHouse features that can be considered disadvantages
1. No full-fledged transactions. 1. No full-fledged transactions.
2. Lack of ability to modify or delete already inserted data with high rate and low latency. There are batch deletes available to clean up data that is not needed anymore or to comply with [GDPR](https://gdpr-info.eu). Batch updates are in development as of July 2018. 2. Lack of ability to modify or delete already inserted data with high rate and low latency. There are batch deletes available to clean up data that is not needed anymore or to comply with [GDPR](https://gdpr-info.eu). Batch updates are currently in development as of July 2018.
3. Sparse index makes ClickHouse not really suitable for point queries retrieving single rows by their keys. 3. Sparse index makes ClickHouse not really suitable for point queries retrieving single rows by their keys.

View File

@ -1,22 +1,24 @@
# Performance # Performance
According to internal testing results, ClickHouse shows the best performance for comparable operating scenarios among systems of its class that were available for testing. This includes the highest throughput for long queries, and the lowest latency on short queries. Testing results are shown on a separate page. According to internal testing results by Yandex, ClickHouse shows the best performance for comparable operating scenarios among systems of its class that were available for testing. This includes the highest throughput for long queries, and the lowest latency on short queries. Testing results are shown on a [separate page](https://clickhouse.yandex/benchmark.html).
There are a lot of independent benchmarks that confirm this as well. You can look it up on your own or here is the small [collection of independent benchmark links](https://clickhouse.yandex/#independent-benchmarks).
## Throughput for a single large query ## Throughput for a single large query
Throughput can be measured in rows per second or in megabytes per second. If the data is placed in the page cache, a query that is not too complex is processed on modern hardware at a speed of approximately 2-10 GB/s of uncompressed data on a single server (for the simplest cases, the speed may reach 30 GB/s). If data is not placed in the page cache, the speed depends on the disk subsystem and the data compression rate. For example, if the disk subsystem allows reading data at 400 MB/s, and the data compression rate is 3, the speed will be around 1.2 GB/s. To get the speed in rows per second, divide the speed in bytes per second by the total size of the columns used in the query. For example, if 10 bytes of columns are extracted, the speed will be around 100-200 million rows per second. Throughput can be measured in rows per second or in megabytes per second. If the data is placed in the page cache, a query that is not too complex is processed on modern hardware at a speed of approximately 2-10 GB/s of uncompressed data on a single server (for the simplest cases, the speed may reach 30 GB/s). If data is not placed in the page cache, the speed is bound by the disk subsystem and how well the data has been compressed. For example, if the disk subsystem allows reading data at 400 MB/s, and the data compression rate is 3, the speed will be around 1.2 GB/s. To get the speed in rows per second, divide the speed in bytes per second by the total size of the columns used in the query. For example, if 10 bytes of columns are extracted, the speed will be around 100-200 million rows per second.
The processing speed increases almost linearly for distributed processing, but only if the number of rows resulting from aggregation or sorting is not too large. The processing speed increases almost linearly for distributed processing, but only if the number of rows resulting from aggregation or sorting is not too large.
## Latency when processing short queries ## Latency when processing short queries
If a query uses a primary key and does not select too many rows to process (hundreds of thousands), and does not use too many columns, we can expect less than 50 milliseconds of latency (single digits of milliseconds in the best case) if data is placed in the page cache. Otherwise, latency is calculated from the number of seeks. If you use rotating drives, for a system that is not overloaded, the latency is calculated by this formula: seek time (10 ms) \* number of columns queried \* number of data parts. If a query uses a primary key and does not select too many rows to process (hundreds of thousands), and does not use too many columns, we can expect less than 50 milliseconds of latency (single digits of milliseconds in the best case) if data is placed in the page cache. Otherwise, latency is calculated from the number of seeks. If you use rotating drives, for a system that is not overloaded, the approximate latency can be calculated by this formula: seek time (10 ms) \* number of columns queried \* number of data parts.
## Throughput when processing a large quantity of short queries ## Throughput when processing a large quantity of short queries
Under the same conditions, ClickHouse can handle several hundred queries per second on a single server (up to several thousand in the best case). Since this scenario is not typical for analytical DBMSs, we recommend expecting a maximum of 100 queries per second. Under the same circumstances, ClickHouse can handle several hundred queries per second on a single server (up to several thousands in the best case). Since this scenario is not typical for analytical DBMSs, it is better to expect a maximum of hundreds of queries per second.
## Performance when inserting data ## Performance when inserting data
We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed will be from 50 to 200 MB/s. If the inserted rows are around 1 Kb in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance will be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, and performance will increase linearly. It is recommended to insert data in batches of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed will be from 50 to 200 MB/s. If the inserted rows are around 1 Kb in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance will be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, and performance will increase linearly.

View File

@ -1,15 +0,0 @@
# Questions you were afraid to ask
## Why not use something like MapReduce?
We can refer to systems like map-reduce as distributed computing systems in which the reduce operation is based on distributed sorting. In this sense, they include Hadoop, and YT (YT is developed at Yandex for internal use).
These systems aren't appropriate for online queries due to their high latency. In other words, they can't be used as the back-end for a web interface.
These types of systems aren't useful for real-time data updates.
Distributed sorting isn't the best way to perform reduce operations if the result of the operation and all the intermediate results (if there are any) are located in the RAM of a single server, which is usually the case for online queries. In such a case, a hash table is the optimal way to perform reduce operations. A common approach to optimizing map-reduce tasks is pre-aggregation (partial reduce) using a hash table in RAM. The user performs this optimization manually.
Distributed sorting is one of the main causes of reduced performance when running simple map-reduce tasks.
Systems like map-reduce allow executing any code on the cluster. But a declarative query language is better suited to OLAP in order to run experiments quickly. For example, Hadoop has Hive and Pig. Also consider Cloudera Impala, Shark (outdated) for Spark, and Spark SQL, Presto, and Apache Drill. Performance when running such tasks is highly sub-optimal compared to specialized systems, but relatively high latency makes it unrealistic to use these systems as the backend for a web interface.
YT allows storing groups of columns separately. But YT can't be considered a true column-based system because it doesn't have fixed-length data types (for efficiently storing numbers without extra "garbage"), and also due to its lack of a vector engine. Tasks are performed in YT using custom code in streaming mode, so they cannot be optimized enough (up to hundreds of millions of rows per second per server). "Dynamic table sorting" is under development in YT using MergeTree, strict value typing, and a query language similar to SQL. Dynamically sorted tables are not appropriate for OLAP tasks because the data is stored by row. The YT query language is still under development, so we can't yet rely on this functionality. YT developers are considering using dynamically sorted tables in OLTP and Key-Value scenarios.

View File

@ -1,9 +1,10 @@
# Yandex.Metrica use case # Yandex.Metrica use case
ClickHouse currently powers [Yandex.Metrica](https://metrica.yandex.com/), [the second largest web analytics platform in the world](http://w3techs.com/technologies/overview/traffic_analysis/all). With more than 13 trillion records in the database and more than 20 billion events daily, ClickHouse allows you generating custom reports on the fly directly from non-aggregated data. ClickHouse has been initially developed to power [Yandex.Metrica](https://metrica.yandex.com/), [the second largest web analytics platform in the world](http://w3techs.com/technologies/overview/traffic_analysis/all), and continues to be it's core component. With more than 13 trillion records in the database and more than 20 billion events daily, ClickHouse allows generating custom reports on the fly directly from non-aggregated data. This article gives a historical background on what was the main goal of ClickHouse before it became an opensource product.
We need to get custom reports based on hits and sessions, with custom segments set by the user. Data for the reports is updated in real-time. Queries must be run immediately (in online mode). We must be able to build reports for any time period. Complex aggregates must be calculated, such as the number of unique visitors. Yandex.Metrica generates custom reports based on hits and sessions on the fly, with arbitrary segments and time periods chosen by the end user. Complex aggregates are often required, such as the number of unique visitors. New data for the reports arrives in real-time.
At this time (April 2014), Yandex.Metrica receives approximately 12 billion events (pageviews and mouse clicks) daily. All these events must be stored in order to build custom reports. A single query may require scanning hundreds of millions of rows over a few seconds, or millions of rows in no more than a few hundred milliseconds.
As of April 2014, Yandex.Metrica received approximately 12 billion events (page views and clicks) daily. All these events must be stored in order to build those custom reports. A single query may require scanning millions of rows in no more than a few hundred milliseconds, or hundreds of millions of rows over a few seconds.
## Usage in Yandex.Metrica and other Yandex services ## Usage in Yandex.Metrica and other Yandex services

View File

@ -156,34 +156,36 @@ Configuring `/etc/odbc.ini` (or `~/.odbc.ini`):
The dictionary configuration in ClickHouse: The dictionary configuration in ClickHouse:
```xml ```xml
<dictionary> <yandex>
<name>table_name</name> <dictionary>
<source> <name>table_name</name>
<odbc> <source>
<!-- You can specifiy the following parameters in connection_string: --> <odbc>
<!-- DSN=myconnection;UID=username;PWD=password;HOST=127.0.0.1;PORT=5432;DATABASE=my_db --> <!-- You can specifiy the following parameters in connection_string: -->
<connection_string>DSN=myconnection</connection_string> <!-- DSN=myconnection;UID=username;PWD=password;HOST=127.0.0.1;PORT=5432;DATABASE=my_db -->
<table>postgresql_table</table> <connection_string>DSN=myconnection</connection_string>
</odbc> <table>postgresql_table</table>
</source> </odbc>
<lifetime> </source>
<min>300</min> <lifetime>
<max>360</max> <min>300</min>
</lifetime> <max>360</max>
<layout> </lifetime>
<hashed/> <layout>
</layout> <hashed/>
<structure> </layout>
<id> <structure>
<name>id</name> <id>
</id> <name>id</name>
<attribute> </id>
<name>some_column</name> <attribute>
<type>UInt64</type> <name>some_column</name>
<null_value>0</null_value> <type>UInt64</type>
</attribute> <null_value>0</null_value>
</structure> </attribute>
</dictionary> </structure>
</dictionary>
</yandex>
``` ```
You may need to edit `odbc.ini` to specify the full path to the library with the driver `DRIVER=/usr/local/lib/psqlodbcw.so`. You may need to edit `odbc.ini` to specify the full path to the library with the driver `DRIVER=/usr/local/lib/psqlodbcw.so`.

12
docs/ru/faq/general.md Normal file
View File

@ -0,0 +1,12 @@
# Общие вопросы
## Почему бы не использовать системы типа MapReduce?
Системами типа MapReduce будем называть системы распределённых вычислений, в которых операция reduce сделана на основе распределённой сортировки. Наиболее распространённым opensource решением данного класса является [Apache Hadoop](http://hadoop.apache.org), а в Яндексе используется внутрення разработка — YT.
Такие системы не подходят для онлайн запросов в силу слишком большой latency. То есть, не могут быть использованы в качестве бэкенда для веб-интерфейса.
Такие системы не подходят для обновления данных в реальном времени.
Распределённая сортировка не является оптимальным способом выполнения операции reduce, если результат выполнения операции и все промежуточные результаты, при их наличии, помещаются в оперативку на одном сервере, как обычно бывает в запросах, выполняющихся в режиме онлайн. В таком случае, оптимальным способом выполнения операции reduce является хэш-таблица. Частым способом оптимизации map-reduce задач является предагрегация (частичный reduce) с использованием хэш-таблицы в оперативной памяти. Эта оптимизация делается пользователем в ручном режиме.
Распределённая сортировка является основной причиной тормозов при выполнении несложных map-reduce задач.
Большинство реализаций MapReduce позволяют выполнять произвольный код на кластере. Но для OLAP задач лучше подходит декларативный язык запросов, который позволяет быстро проводить исследования. Для примера, для Hadoop существует Hive и Pig. Также смотрите Cloudera Impala, Shark (устаревший) для Spark, а также Spark SQL, Presto, Apache Drill. Впрочем, производительность при выполнении таких задач является сильно неоптимальной по сравнению со специализированными системами, а сравнительно высокая latency не позволяет использовать эти системы в качестве бэкенда для веб-интерфейса.

View File

@ -361,9 +361,8 @@ Q4: 0.072 sec.
## Резюме ## Резюме
```text | серверов| Q1 | Q2 | Q3 | Q4 |
nodes Q1 Q2 Q3 Q4 | ------- | ----- | ----- | ----- | ----- |
1 0.490 1.224 2.104 3.593 | 1 | 0.490 | 1.224 | 2.104 | 3.593 |
3 0.212 0.438 0.733 1.241 | 3 | 0.212 | 0.438 | 0.733 | 1.241 |
140 0.028 0.043 0.051 0.072 | 140 | 0.028 | 0.043 | 0.051 | 0.072 |
```

View File

@ -2,15 +2,6 @@
# OnTime # OnTime
Данный тест производительности был создан Вадимом Ткаченко, см:
- <https://www.percona.com/blog/2009/10/02/analyzing-air-traffic-performance-with-infobright-and-monetdb/>
- <https://www.percona.com/blog/2009/10/26/air-traffic-queries-in-luciddb/>
- <https://www.percona.com/blog/2009/11/02/air-traffic-queries-in-infinidb-early-alpha/>
- <https://www.percona.com/blog/2014/04/21/using-apache-hadoop-and-impala-together-with-mysql-for-data-analysis/>
- <https://www.percona.com/blog/2016/01/07/apache-spark-with-air-ontime-performance-data/>
- <http://nickmakos.blogspot.ru/2012/08/analyzing-air-traffic-performance-with.html>
Скачивание данных: Скачивание данных:
```bash ```bash
@ -316,3 +307,12 @@ SELECT OriginCityName, DestCityName, count() AS c FROM ontime GROUP BY OriginCit
SELECT OriginCityName, count() AS c FROM ontime GROUP BY OriginCityName ORDER BY c DESC LIMIT 10; SELECT OriginCityName, count() AS c FROM ontime GROUP BY OriginCityName ORDER BY c DESC LIMIT 10;
``` ```
Данный тест производительности был создан Вадимом Ткаченко, статьи по теме:
- <https://www.percona.com/blog/2009/10/02/analyzing-air-traffic-performance-with-infobright-and-monetdb/>
- <https://www.percona.com/blog/2009/10/26/air-traffic-queries-in-luciddb/>
- <https://www.percona.com/blog/2009/11/02/air-traffic-queries-in-infinidb-early-alpha/>
- <https://www.percona.com/blog/2014/04/21/using-apache-hadoop-and-impala-together-with-mysql-for-data-analysis/>
- <https://www.percona.com/blog/2016/01/07/apache-spark-with-air-ontime-performance-data/>
- <http://nickmakos.blogspot.ru/2012/08/analyzing-air-traffic-performance-with.html>

View File

@ -66,6 +66,12 @@ struct Message {
При форматировании, строки выводятся в двойных кавычках. Двойная кавычка внутри строки выводится как две двойные кавычки подряд. Других правил экранирования нет. Даты и даты-с-временем выводятся в двойных кавычках. Числа выводятся без кавычек. Значения разделяются символом-разделителем, по умолчанию — `,`. Символ-разделитель определяется настройкой [format_csv_delimiter](../operations/settings/settings.md#format_csv_delimiter). Строки разделяются unix переводом строки (LF). Массивы сериализуются в CSV следующим образом: сначала массив сериализуется в строку, как в формате TabSeparated, а затем полученная строка выводится в CSV в двойных кавычках. Кортежи в формате CSV сериализуются, как отдельные столбцы (то есть, теряется их вложенность в кортеж). При форматировании, строки выводятся в двойных кавычках. Двойная кавычка внутри строки выводится как две двойные кавычки подряд. Других правил экранирования нет. Даты и даты-с-временем выводятся в двойных кавычках. Числа выводятся без кавычек. Значения разделяются символом-разделителем, по умолчанию — `,`. Символ-разделитель определяется настройкой [format_csv_delimiter](../operations/settings/settings.md#format_csv_delimiter). Строки разделяются unix переводом строки (LF). Массивы сериализуются в CSV следующим образом: сначала массив сериализуется в строку, как в формате TabSeparated, а затем полученная строка выводится в CSV в двойных кавычках. Кортежи в формате CSV сериализуются, как отдельные столбцы (то есть, теряется их вложенность в кортеж).
```
clickhouse-client --format_csv_delimiter="|" --query="INSERT INTO test.csv FORMAT CSV" < data.csv
```
&ast;По умолчанию — `,`. См. настройку [format_csv_delimiter](/operations/settings/settings/#format_csv_delimiter) для дополнительной информации.
При парсинге, все значения могут парситься как в кавычках, так и без кавычек. Поддерживаются как двойные, так и одинарные кавычки. В том числе, строки могут быть расположены без кавычек - тогда они парсятся до символа-разделителя или перевода строки (CR или LF). В нарушение RFC, в случае парсинга строк не в кавычках, начальные и конечные пробелы и табы игнорируются. В качестве перевода строки, поддерживаются как Unix (LF), так и Windows (CR LF) и Mac OS Classic (LF CR) варианты. При парсинге, все значения могут парситься как в кавычках, так и без кавычек. Поддерживаются как двойные, так и одинарные кавычки. В том числе, строки могут быть расположены без кавычек - тогда они парсятся до символа-разделителя или перевода строки (CR или LF). В нарушение RFC, в случае парсинга строк не в кавычках, начальные и конечные пробелы и табы игнорируются. В качестве перевода строки, поддерживаются как Unix (LF), так и Windows (CR LF) и Mac OS Classic (LF CR) варианты.
`NULL` форматируется в виде `\N`. `NULL` форматируется в виде `\N`.

View File

@ -1,10 +1,12 @@
# Производительность # Производительность
По результатам внутреннего тестирования, ClickHouse обладает наиболее высокой производительностью (как наиболее высоким throughput на длинных запросах, так и наиболее низкой latency на коротких запросах), при соответствующем сценарии работы, среди доступных для тестирования систем подобного класса. Результаты тестирования можно посмотреть на отдельной странице. По результатам внутреннего тестирования в Яндексе, ClickHouse обладает наиболее высокой производительностью (как наиболее высокой пропускной способностью на длинных запросах, так и наиболее низкой задержкой на коротких запросах), при соответствующем сценарии работы, среди доступных для тестирования систем подобного класса. Результаты тестирования можно посмотреть на [отдельной странице](https://clickhouse.yandex/benchmark.html).
Также это подтверждают многочисленные независимые бенчмарки. Их не сложно найти в Интернете самостоятельно, либо можно воспользоваться [небольшой коллекцией ссылок по теме](https://clickhouse.yandex/#independent-benchmarks).
## Пропускная способность при обработке одного большого запроса ## Пропускная способность при обработке одного большого запроса
Пропускную способность можно измерять в строчках в секунду и в мегабайтах в секунду. При условии, что данные помещаются в page cache, не слишком сложный запрос обрабатывается на современном железе со скоростью около 2-10 GB/sec. несжатых данных на одном сервере (в простейшем случае скорость может достигать 30 GB/sec). Если данные не помещаются в page cache, то скорость работы зависит от скорости дисковой подсистемы и коэффициента сжатия данных. Например, если дисковая подсистема позволяет читать данные со скоростью 400 MB/sec., а коэффициент сжатия данных составляет 3, то скорость будет около 1.2GB/sec. Для получения скорости в строчках в секунду, следует поделить скорость в байтах в секунду на суммарный размер используемых в запросе столбцов. Например, если вынимаются столбцы на 10 байт, то скорость будет в районе 100-200 млн. строчек в секунду. Пропускную способность можно измерять в строчках в секунду и в мегабайтах в секунду. При условии, что данные помещаются в page cache, не слишком сложный запрос обрабатывается на современном железе со скоростью около 2-10 GB/sec. несжатых данных на одном сервере (в простейшем случае скорость может достигать 30 GB/sec). Если данные не помещаются в page cache, то скорость работы зависит от скорости дисковой подсистемы и коэффициента сжатия данных. Например, если дисковая подсистема позволяет читать данные со скоростью 400 MB/sec., а коэффициент сжатия данных составляет 3, то скорость будет около 1.2GB/sec. Для получения скорости в строчках в секунду, следует поделить скорость в байтах в секунду на суммарный размер используемых в запросе столбцов. Например, если вынимаются столбцы на 10 байт, то скорость будет в районе 100-200 млн. строк в секунду.
При распределённой обработке запроса, скорость обработки запроса растёт почти линейно, но только при условии, что в результате агрегации или при сортировке получается не слишком большое множество строчек. При распределённой обработке запроса, скорость обработки запроса растёт почти линейно, но только при условии, что в результате агрегации или при сортировке получается не слишком большое множество строчек.
@ -12,7 +14,7 @@
Если запрос использует первичный ключ, и выбирает для обработки не слишком большое количество строчек (сотни тысяч), и использует не слишком большое количество столбцов, то вы можете рассчитывать на latency менее 50 миллисекунд (от единиц миллисекунд в лучшем случае), при условии, что данные помещаются в page cache. Иначе latency вычисляется из количества seek-ов. Если вы используйте вращающиеся диски, то на не слишком сильно нагруженной системе, latency вычисляется по формуле: seek time (10 мс.) \* количество столбцов в запросе \* количество кусков с данными. Если запрос использует первичный ключ, и выбирает для обработки не слишком большое количество строчек (сотни тысяч), и использует не слишком большое количество столбцов, то вы можете рассчитывать на latency менее 50 миллисекунд (от единиц миллисекунд в лучшем случае), при условии, что данные помещаются в page cache. Иначе latency вычисляется из количества seek-ов. Если вы используйте вращающиеся диски, то на не слишком сильно нагруженной системе, latency вычисляется по формуле: seek time (10 мс.) \* количество столбцов в запросе \* количество кусков с данными.
## Пропускная способность при обработке большого количества коротких запросов ## Пропускная способность при обработке многочисленных коротких запросов
При тех же условиях, ClickHouse может обработать несколько сотен (до нескольких тысяч в лучшем случае) запросов в секунду на одном сервере. Так как такой сценарий работы не является типичным для аналитических СУБД, рекомендуется рассчитывать не более чем на 100 запросов в секунду. При тех же условиях, ClickHouse может обработать несколько сотен (до нескольких тысяч в лучшем случае) запросов в секунду на одном сервере. Так как такой сценарий работы не является типичным для аналитических СУБД, рекомендуется рассчитывать не более чем на 100 запросов в секунду.

View File

@ -1,14 +0,0 @@
# Возможные глупые вопросы
## Почему бы не использовать системы типа MapReduce?
Системами типа map-reduce будем называть системы распределённых вычислений, в которых операция reduce сделана на основе распределённой сортировки. Таким образом, к ним относятся Hadoop и YT (YT является внутренней разработкой Яндекса).
Такие системы не подходят для онлайн запросов в силу слишком большой latency. То есть, не могут быть использованы в качестве бэкенда для веб-интерфейса.
Такие системы не подходят для обновления данных в реальном времени.
Распределённая сортировка не является оптимальным способом выполнения операции reduce, если результат выполнения операции и все промежуточные результаты, при их наличии, помещаются в оперативку на одном сервере, как обычно бывает в запросах, выполняющихся в режиме онлайн. В таком случае, оптимальным способом выполнения операции reduce является хэш-таблица. Частым способом оптимизации map-reduce задач является предагрегация (частичный reduce) с использованием хэш-таблицы в оперативке. Эта оптимизация делается пользователем в ручном режиме.
Распределённая сортировка является основной причиной тормозов при выполнении несложных map-reduce задач.
Системы типа map-reduce позволяют выполнять произвольный код на кластере. Но для OLAP задач лучше подходит декларативный язык запросов, который позволяет быстро проводить исследования. Для примера, для Hadoop существует Hive и Pig. Также смотрите Cloudera Impala, Shark (устаревший) для Spark а также Spark SQL, Presto, Apache Drill. Впрочем, производительность при выполнении таких задач является сильно неоптимальной по сравнению со специализированными системами, а сравнительно высокая latency не позволяет использовать эти системы в качестве бэкенда для веб-интерфейса.
YT позволяет хранить группы столбцов по отдельности. Но YT нельзя назвать по-настоящему столбцовой системой, так как в системе отсутствуют типы данных постоянной длины (чтобы можно было эффективно хранить числа без "мусора"), а также за счёт отсутствия векторного движка. Задачи в YT выполняются с помощью произвольного кода в режиме streaming, то есть, не могут быть достаточно оптимизированы (до сотен миллионов строк в секунду на один сервер). В YT в 2014-2016 годах находится в разработке функциональность "динамических сортированных таблиц" с использованием Merge Tree, строгой типизацией значений и языком запросов типа SQL. Динамические сортированные таблицы не подходят для OLAP задач, так как данные в них хранятся по строкам. Разработка языка запросов в YT всё ещё находится в зачаточной стадии, что не позволяет ориентироваться на эту функциональность. Разработчики YT рассматривают динамические сортированные таблицы для применения в OLTP и Key-Value сценариях работы.

View File

@ -1,9 +1,10 @@
# Постановка задачи в Яндекс.Метрике # Постановка задачи в Яндекс.Метрике
ClickHouse на данный момент обеспечивает работу [Яндекс.Метрики](https://metrika.yandex.ru/), [второй крупнейшей в мире](http://w3techs.com/technologies/overview/traffic_analysis/all) платформы для веб аналитики. При более 13 триллионах записей в базе данных и более 20 миллиардах событий в сутки, ClickHouse позволяет генерировать индивидуально настроенные отчёты на лету напрямую из неагрегированных данных. ClickHouse изначально разрабатывался для обеспечения работы [Яндекс.Метрики](https://metrika.yandex.ru/), [второй крупнейшей в мире](http://w3techs.com/technologies/overview/traffic_analysis/all) платформы для веб аналитики, и продолжает быть её ключевым компонентом. При более 13 триллионах записей в базе данных и более 20 миллиардах событий в сутки, ClickHouse позволяет генерировать индивидуально настроенные отчёты на лету напрямую из неагрегированных данных. Данная статья вкратце демонстрирует какие цели исторически стояли перед ClickHouse на ранних этапах его развития.
Нужно получать произвольные отчёты на основе хитов и визитов, с произвольными сегментами, задаваемыми пользователем. Данные для отчётов обновляются в реальном времени. Запросы должны выполняться сразу (в режиме онлайн). Отчёты должно быть возможно строить за произвольный период. Требуется вычислять сложные агрегаты типа количества уникальных посетителей. Яндекс.Метрика на лету строит индивидуальные отчёты на основе хитов и визитов, с периодом и произвольными сегментами, задаваемыми конечным пользователем. Часто требуется построение сложных агрегатов, например числа уникальных пользлователей. Новые данные для построения отчета поступают в реальном времени.
На данный момент (апрель 2014), каждый день в Яндекс.Метрику поступает около 12 миллиардов событий (хитов и кликов мыши). Все эти события должны быть сохранены для возможности строить произвольные отчёты. Один запрос может потребовать просканировать сотни миллионов строк за время не более нескольких секунд, или миллионы строк за время не более нескольких сотен миллисекунд.
На апрель 2014, в Яндекс.Метрику поступало около 12 миллиардов событий (показов страниц и кликов мыши) ежедневно. Все эти события должны быть сохранены для возможности строить произвольные отчёты. Один запрос может потребовать просканировать миллионы строк за время не более нескольких сотен миллисекунд, или сотни миллионов строк за время не более нескольких секунд.
## Использование в Яндекс.Метрике и других отделах Яндекса ## Использование в Яндекс.Метрике и других отделах Яндекса

View File

@ -156,34 +156,36 @@
Конфигурация словаря в ClickHouse: Конфигурация словаря в ClickHouse:
```xml ```xml
<dictionary> <yandex>
<name>table_name</name> <dictionary>
<source> <name>table_name</name>
<odbc> <source>
<!-- в connection_string можно указывать следующие параметры: --> <odbc>
<!-- DSN=myconnection;UID=username;PWD=password;HOST=127.0.0.1;PORT=5432;DATABASE=my_db --> <!-- в connection_string можно указывать следующие параметры: -->
<connection_string>DSN=myconnection</connection_string> <!-- DSN=myconnection;UID=username;PWD=password;HOST=127.0.0.1;PORT=5432;DATABASE=my_db -->
<table>postgresql_table</table> <connection_string>DSN=myconnection</connection_string>
</odbc> <table>postgresql_table</table>
</source> </odbc>
<lifetime> </source>
<min>300</min> <lifetime>
<max>360</max> <min>300</min>
</lifetime> <max>360</max>
<layout> </lifetime>
<hashed/> <layout>
</layout> <hashed/>
<structure> </layout>
<id> <structure>
<name>id</name> <id>
</id> <name>id</name>
<attribute> </id>
<name>some_column</name> <attribute>
<type>UInt64</type> <name>some_column</name>
<null_value>0</null_value> <type>UInt64</type>
</attribute> <null_value>0</null_value>
</structure> </attribute>
</dictionary> </structure>
</dictionary>
</yandex>
``` ```
Может понадобиться в `odbc.ini` указать полный путь до библиотеки с драйвером `DRIVER=/usr/local/lib/psqlodbcw.so`. Может понадобиться в `odbc.ini` указать полный путь до библиотеки с драйвером `DRIVER=/usr/local/lib/psqlodbcw.so`.

View File

@ -3,9 +3,8 @@ pages:
- 'Overview': 'index.md' - 'Overview': 'index.md'
- 'Distinctive features of ClickHouse': 'introduction/distinctive_features.md' - 'Distinctive features of ClickHouse': 'introduction/distinctive_features.md'
- 'ClickHouse features that can be considered disadvantages': 'introduction/features_considered_disadvantages.md' - 'ClickHouse features that can be considered disadvantages': 'introduction/features_considered_disadvantages.md'
- 'The Yandex.Metrica task': 'introduction/ya_metrika_task.md'
- 'Everything you were afraid to ask': 'introduction/possible_silly_questions.md'
- 'Performance': 'introduction/performance.md' - 'Performance': 'introduction/performance.md'
- 'The Yandex.Metrica task': 'introduction/ya_metrika_task.md'
- 'Getting started': - 'Getting started':
- 'Deploying and running': 'getting_started/index.md' - 'Deploying and running': 'getting_started/index.md'
@ -160,6 +159,9 @@ pages:
- 'clickhouse-copier': 'operations/utils/clickhouse-copier.md' - 'clickhouse-copier': 'operations/utils/clickhouse-copier.md'
- 'clickhouse-local': 'operations/utils/clickhouse-local.md' - 'clickhouse-local': 'operations/utils/clickhouse-local.md'
- 'F.A.Q.':
- 'General questions': 'faq/general.md'
- 'Development': - 'Development':
- 'hidden': 'development/index.md' - 'hidden': 'development/index.md'
- 'Overview of ClickHouse architecture': 'development/architecture.md' - 'Overview of ClickHouse architecture': 'development/architecture.md'

View File

@ -4,9 +4,8 @@ pages:
- 'Обзор': 'index.md' - 'Обзор': 'index.md'
- 'Отличительные возможности ClickHouse': 'introduction/distinctive_features.md' - 'Отличительные возможности ClickHouse': 'introduction/distinctive_features.md'
- 'Особенности ClickHouse, которые могут считаться недостатками': 'introduction/features_considered_disadvantages.md' - 'Особенности ClickHouse, которые могут считаться недостатками': 'introduction/features_considered_disadvantages.md'
- 'Постановка задачи в Яндекс.Метрике': 'introduction/ya_metrika_task.md'
- 'Возможные глупые вопросы': 'introduction/possible_silly_questions.md'
- 'Производительность': 'introduction/performance.md' - 'Производительность': 'introduction/performance.md'
- 'Постановка задачи в Яндекс.Метрике': 'introduction/ya_metrika_task.md'
- 'Начало работы': - 'Начало работы':
- 'Установка и запуск': 'getting_started/index.md' - 'Установка и запуск': 'getting_started/index.md'
@ -166,6 +165,9 @@ pages:
- 'clickhouse-copier': 'operations/utils/clickhouse-copier.md' - 'clickhouse-copier': 'operations/utils/clickhouse-copier.md'
- 'clickhouse-local': 'operations/utils/clickhouse-local.md' - 'clickhouse-local': 'operations/utils/clickhouse-local.md'
- 'F.A.Q.':
- 'Общие вопросы': 'faq/general.md'
- 'Разработка': - 'Разработка':
- 'hidden': 'development/index.md' - 'hidden': 'development/index.md'
- 'Overview of ClickHouse architecture': 'development/architecture.md' - 'Overview of ClickHouse architecture': 'development/architecture.md'

View File

@ -5,7 +5,7 @@
<nav class="md-footer-nav__inner md-grid"> <nav class="md-footer-nav__inner md-grid">
{% for _ in range(0, 9) %} {% for _ in range(0, 9) %}
{% if page.previous_page and page.previous_page.title == "hidden" %} {% if page.previous_page and page.previous_page.title == "hidden" %}
{{ page.__dict__.update({"previous_page": page.previous_page.previous_page}) }} {{ page.__dict__.update({"previous_page": page.previous_page.previous_page}) or '' }}
{% endif %} {% endif %}
{% endfor %} {% endfor %}
{% if page.previous_page %} {% if page.previous_page %}
@ -25,7 +25,7 @@
{% endif %} {% endif %}
{% for _ in range(0, 9) %} {% for _ in range(0, 9) %}
{% if page.next_page and page.next_page.title == "hidden" %} {% if page.next_page and page.next_page.title == "hidden" %}
{{ page.__dict__.update({"next_page": page.next_page.next_page}) }} {{ page.__dict__.update({"next_page": page.next_page.next_page}) or '' }}
{% endif %} {% endif %}
{% endfor %} {% endfor %}
{% if page.next_page %} {% if page.next_page %}

View File

@ -390,7 +390,7 @@
<pre> <pre>
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 # optional sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 # optional
sudo echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" > /etc/apt/sources.list.d/clickhouse.list echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt-get update sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client sudo apt-get install -y clickhouse-server clickhouse-client