Merge remote-tracking branch 'upstream/master'

This commit is contained in:
BayoNet 2018-10-18 13:52:53 +03:00
commit 5af663bb9c
491 changed files with 4671 additions and 1714 deletions

View File

@ -1,5 +1,8 @@
# Broken in macos. TODO: update clang, re-test, enable
if (NOT APPLE)
option (ENABLE_EMBEDDED_COMPILER "Set to TRUE to enable support for 'compile' option for query execution" 1)
option (USE_INTERNAL_LLVM_LIBRARY "Use bundled or system LLVM library. Default: system library for quicker developer builds." ${APPLE})
endif ()
if (ENABLE_EMBEDDED_COMPILER)
if (USE_INTERNAL_LLVM_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/llvm/llvm/CMakeLists.txt")

View File

@ -3,7 +3,7 @@ Motivation
For reproducible build, we need to control, what exactly version of boost we build,
because different versions of boost obviously have slightly different behaviour.
You may already have installed arbitary version of boost on your system, to build another projects.
You may already have installed arbitrary version of boost on your system, to build another projects.
We need to have all libraries with C++ interface to be located in tree and to be build together.
This is needed to allow quickly changing build options, that could introduce changes in ABI of that libraries.

View File

@ -1,7 +1,7 @@
This directory contains several hash-map implementations, similar in
API to SGI's hash_map class, but with different performance
characteristics. sparse_hash_map uses very little space overhead, 1-2
bits per entry. dense_hash_map is very fast, particulary on lookup.
bits per entry. dense_hash_map is very fast, particularly on lookup.
(sparse_hash_set and dense_hash_set are the set versions of these
routines.) On the other hand, these classes have requirements that
may not make them appropriate for all applications.

View File

@ -2,10 +2,10 @@
set(VERSION_REVISION 54409 CACHE STRING "")
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 14 CACHE STRING "")
set(VERSION_PATCH 6 CACHE STRING "")
set(VERSION_GITHASH 899f41741cf9824d94bf6562c21b90c62ea4fee0 CACHE STRING "")
set(VERSION_DESCRIBE v18.14.6-testing CACHE STRING "")
set(VERSION_STRING 18.14.6 CACHE STRING "")
set(VERSION_PATCH 9 CACHE STRING "")
set(VERSION_GITHASH 457f8fd495b2812940e69c15ab5b499cd863aae4 CACHE STRING "")
set(VERSION_DESCRIBE v18.14.9-testing CACHE STRING "")
set(VERSION_STRING 18.14.9 CACHE STRING "")
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -110,6 +110,8 @@ private:
/// Don't execute new queries after timelimit or SIGINT or exception
std::atomic<bool> shutdown{false};
std::atomic<size_t> queries_executed{0};
struct Stats
{
Stopwatch watch;
@ -238,10 +240,12 @@ private:
size_t query_index = randomize ? distribution(generator) : i % queries.size();
if (!tryPushQueryInteractively(queries[query_index], interrupt_listener))
{
shutdown = true;
break;
}
}
shutdown = true;
pool.wait();
info_total.watch.stop();
@ -274,11 +278,12 @@ private:
{
extracted = queue.tryPop(query, 100);
if (shutdown)
if (shutdown || (max_iterations && queries_executed == max_iterations))
return;
}
execute(connection, query);
++queries_executed;
}
}
catch (...)

View File

@ -500,7 +500,7 @@ private:
return CRC32Hash()(StringRef(reinterpret_cast<const char *>(begin), (end - begin) * sizeof(CodePoint)));
}
/// By the way, we don't have to use actual Unicode numbers. We use just arbitary bijective mapping.
/// By the way, we don't have to use actual Unicode numbers. We use just arbitrary bijective mapping.
CodePoint readCodePoint(const char *& pos, const char * end)
{
size_t length = UTF8::seqLength(*pos);
@ -954,7 +954,7 @@ try
("structure,S", po::value<std::string>(), "structure of the initial table (list of column and type names)")
("input-format", po::value<std::string>(), "input format of the initial table data")
("output-format", po::value<std::string>(), "default output format")
("seed", po::value<std::string>(), "seed (arbitary string), must be random string with at least 10 bytes length")
("seed", po::value<std::string>(), "seed (arbitrary string), must be random string with at least 10 bytes length")
("limit", po::value<UInt64>(), "if specified - stop after generating that number of rows")
("silent", po::value<bool>()->default_value(false), "don't print information messages to stderr")
("order", po::value<UInt64>()->default_value(5), "order of markov model to generate strings")

View File

@ -94,7 +94,8 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
{
schema_name = params.get("schema");
LOG_TRACE(log, "Will fetch info for table '" << schema_name + "." + table_name << "'");
} else
}
else
LOG_TRACE(log, "Will fetch info for table '" << table_name << "'");
LOG_TRACE(log, "Got connection str '" << connection_string << "'");

View File

@ -5,8 +5,8 @@ was possible segfaults or another faults in ODBC implementations, which can
crash whole clickhouse-server process.
This tool works via HTTP, not via pipes, shared memory, or TCP because:
- It's simplier to implement
- It's simplier to debug
- It's simpler to implement
- It's simpler to debug
- jdbc-bridge can be implemented in the same way
## Usage

View File

@ -6,10 +6,10 @@
namespace DB
{
/** Passing arbitary connection string to ODBC Driver Manager is insecure, for the following reasons:
/** Passing arbitrary connection string to ODBC Driver Manager is insecure, for the following reasons:
* 1. Driver Manager like unixODBC has multiple bugs like buffer overflow.
* 2. Driver Manager can interpret some parameters as a path to library for dlopen or a file to read,
* thus allows arbitary remote code execution.
* thus allows arbitrary remote code execution.
*
* This function will throw exception if connection string has insecure parameters.
* It may also modify connection string to harden it.

View File

@ -107,7 +107,7 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
/// TODO Do positions need to be 1-based for this function?
size_t position = columns[1]->get64(row_num);
size_t position = columns[1]->getUInt(row_num);
/// If position is larger than size to which array will be cutted - simply ignore value.
if (length_to_resize && position >= length_to_resize)

View File

@ -607,7 +607,7 @@ struct AggregateFunctionAnyLastData : Data
/** Implement 'heavy hitters' algorithm.
* Selects most frequent value if its frequency is more than 50% in each thread of execution.
* Otherwise, selects some arbitary value.
* Otherwise, selects some arbitrary value.
* http://www.cs.umd.edu/~samir/498/karp.pdf
*/
template <typename Data>

View File

@ -10,7 +10,7 @@ namespace DB
{
/** Aggregate function that takes arbitary number of arbitary arguments and does nothing.
/** Aggregate function that takes arbitrary number of arbitrary arguments and does nothing.
*/
class AggregateFunctionNothing final : public IAggregateFunctionHelper<AggregateFunctionNothing>
{

View File

@ -231,6 +231,14 @@ void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64
revision = server_revision;
}
UInt64 Connection::getServerRevision()
{
if (!connected)
connect();
return server_revision;
}
const String & Connection::getServerTimezone()
{
if (!connected)
@ -349,7 +357,7 @@ void Connection::sendQuery(
{
ClientInfo client_info_to_send;
if (!client_info)
if (!client_info || client_info->empty())
{
/// No client info passed - means this query initiated by me.
client_info_to_send.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;

View File

@ -106,6 +106,7 @@ public:
void setDefaultDatabase(const String & database);
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision);
UInt64 getServerRevision();
const String & getServerTimezone();
const String & getServerDisplayName();

View File

@ -153,13 +153,9 @@ ConnectionPoolWithFailover::tryGetEntry(
{
result.entry = pool.get(settings, /* force_connected = */ false);
String server_name;
UInt64 server_version_major;
UInt64 server_version_minor;
UInt64 server_version_patch;
UInt64 server_revision;
UInt64 server_revision = 0;
if (table_to_check)
result.entry->getServerVersion(server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
server_revision = result.entry->getServerRevision();
if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
{

View File

@ -87,28 +87,36 @@ void MultiplexedConnections::sendQuery(
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (replica_states.size() > 1)
{
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_states.size();
Settings modified_settings = settings;
for (size_t i = 0; i < replica_states.size(); ++i)
for (auto & replica : replica_states)
{
Connection * connection = replica_states[i].connection;
if (connection == nullptr)
if (!replica.connection)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
query_settings.parallel_replica_offset = i;
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data);
if (replica.connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{
/// Disable two-level aggregation due to version incompatibility.
modified_settings.group_by_two_level_threshold = 0;
modified_settings.group_by_two_level_threshold_bytes = 0;
}
}
size_t num_replicas = replica_states.size();
if (num_replicas > 1)
{
/// Use multiple replicas for parallel query processing.
modified_settings.parallel_replicas_count = num_replicas;
for (size_t i = 0; i < num_replicas; ++i)
{
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
}
}
else
{
Connection * connection = replica_states[0].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, &settings, client_info, with_pending_data);
/// Use single replica.
replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
}
sent_query = true;

View File

@ -373,7 +373,7 @@ const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char *
/** We will read from src_arena.
* There is no limit for reading - it is assumed, that we can read all that we need after src_arena pointer.
* Buf ReadBufferFromMemory requires some bound. We will use arbitary big enough number, that will not overflow pointer.
* Buf ReadBufferFromMemory requires some bound. We will use arbitrary big enough number, that will not overflow pointer.
* NOTE Technically, this is not compatible with C++ standard,
* as we cannot legally compare pointers after last element + 1 of some valid memory region.
* Probably this will not work under UBSan.

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
/** ColumnConst contains another column with single element,
* but looks like a column with arbitary amount of same elements.
* but looks like a column with arbitrary amount of same elements.
*/
class ColumnConst final : public COWPtrHelper<IColumn, ColumnConst>
{

View File

@ -1,6 +1,3 @@
#include <cmath>
#include <ext/bit_cast.h>
#include <Common/Exception.h>
#include <Common/Arena.h>
#include <Common/SipHash.h>
@ -53,7 +50,7 @@ UInt64 ColumnDecimal<T>::get64(size_t n) const
{
if constexpr (sizeof(T) > sizeof(UInt64))
throw Exception(String("Method get64 is not supported for ") + getFamilyName(), ErrorCodes::NOT_IMPLEMENTED);
return ext::bit_cast<UInt64>(data[n]);
return static_cast<typename T::NativeType>(data[n]);
}
template <typename T>
@ -120,6 +117,14 @@ MutableColumnPtr ColumnDecimal<T>::cloneResized(size_t size) const
return std::move(res);
}
template <typename T>
void ColumnDecimal<T>::insertData(const char * src, size_t /*length*/)
{
T tmp;
memcpy(&tmp, src, sizeof(T));
data.emplace_back(tmp);
}
template <typename T>
void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{

View File

@ -89,7 +89,7 @@ public:
void reserve(size_t n) override { data.reserve(n); }
void insertFrom(const IColumn & src, size_t n) override { data.push_back(static_cast<const Self &>(src).getData()[n]); }
void insertData(const char * pos, size_t /*length*/) override { data.push_back(*reinterpret_cast<const T *>(pos)); }
void insertData(const char * pos, size_t /*length*/) override;
void insertDefault() override { data.push_back(T()); }
void insert(const Field & x) override { data.push_back(DB::get<typename NearestFieldType<T>::Type>(x)); }
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
@ -112,6 +112,7 @@ public:
bool getBool(size_t n) const override { return bool(data[n]); }
Int64 getInt(size_t n) const override { return Int64(data[n] * scale); }
UInt64 get64(size_t n) const override;
bool isDefaultAt(size_t n) const override { return data[n] == 0; }
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const IColumn::Permutation & perm, size_t limit) const override;

View File

@ -92,7 +92,7 @@ public:
}
/** If column is numeric, return value of n-th element, casted to UInt64.
* For NULL values of Nullable column it is allowed to return arbitary value.
* For NULL values of Nullable column it is allowed to return arbitrary value.
* Otherwise throw an exception.
*/
virtual UInt64 getUInt(size_t /*n*/) const
@ -105,6 +105,7 @@ public:
throw Exception("Method getInt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual bool isDefaultAt(size_t n) const { return get64(n) == 0; }
virtual bool isNullAt(size_t /*n*/) const { return false; }
/** If column is numeric, return value of n-th element, casted to bool.
@ -174,7 +175,7 @@ public:
virtual const char * deserializeAndInsertFromArena(const char * pos) = 0;
/// Update state of hash function with value of n-th element.
/// On subsequent calls of this method for sequence of column values of arbitary types,
/// On subsequent calls of this method for sequence of column values of arbitrary types,
/// passed bytes to hash must identify sequence of values unambiguously.
virtual void updateHashWithValue(size_t n, SipHash & hash) const = 0;

View File

@ -254,7 +254,8 @@ struct ODBCBridgeMixin
static void startBridge(const Poco::Util::AbstractConfiguration & config, Poco::Logger * log, const Poco::Timespan & http_timeout)
{
Poco::Path path{config.getString("application.dir", "")};
/// Path to executable folder
Poco::Path path{config.getString("application.dir", "/usr/bin")};
path.setFileName(
#if CLICKHOUSE_SPLIT_BINARY
@ -264,9 +265,6 @@ struct ODBCBridgeMixin
#endif
);
if (!Poco::File(path).exists())
throw Exception("clickhouse binary (" + path.toString() + ") is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
std::stringstream command;
command << path.toString() <<

View File

@ -19,7 +19,7 @@ namespace DB
* This is unit of data processing.
* Also contains metadata - data types of columns and their names
* (either original names from a table, or generated names during temporary calculations).
* Allows to insert, remove columns in arbitary position, to change order of columns.
* Allows to insert, remove columns in arbitrary position, to change order of columns.
*/
class Context;

View File

@ -47,6 +47,10 @@
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54408
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226

View File

@ -16,7 +16,7 @@ struct TypePair
template <typename T, bool _int, bool _float, bool _dec, typename F>
template <typename T, bool _int, bool _float, bool _decimal, bool _datetime, typename F>
bool callOnBasicType(TypeIndex number, F && f)
{
if constexpr (_int)
@ -40,7 +40,7 @@ bool callOnBasicType(TypeIndex number, F && f)
}
}
if constexpr (_dec)
if constexpr (_decimal)
{
switch (number)
{
@ -63,40 +63,51 @@ bool callOnBasicType(TypeIndex number, F && f)
}
}
if constexpr (_datetime)
{
switch (number)
{
case TypeIndex::Date: return f(TypePair<T, UInt16>());
case TypeIndex::DateTime: return f(TypePair<T, UInt32>());
default:
break;
}
}
return false;
}
/// Unroll template using TypeIndex
template <bool _int, bool _float, bool _dec, typename F>
template <bool _int, bool _float, bool _decimal, bool _datetime, typename F>
inline bool callOnBasicTypes(TypeIndex type_num1, TypeIndex type_num2, F && f)
{
if constexpr (_int)
{
switch (type_num1)
{
case TypeIndex::UInt8: return callOnBasicType<UInt8, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::UInt16: return callOnBasicType<UInt16, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::UInt32: return callOnBasicType<UInt32, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::UInt64: return callOnBasicType<UInt64, _int, _float, _dec>(type_num2, std::forward<F>(f));
//case TypeIndex::UInt128: return callOnBasicType<UInt128, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::UInt8: return callOnBasicType<UInt8, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::UInt16: return callOnBasicType<UInt16, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::UInt32: return callOnBasicType<UInt32, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::UInt64: return callOnBasicType<UInt64, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
//case TypeIndex::UInt128: return callOnBasicType<UInt128, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::Int8: return callOnBasicType<Int8, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Int16: return callOnBasicType<Int16, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Int32: return callOnBasicType<Int32, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Int64: return callOnBasicType<Int64, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Int128: return callOnBasicType<Int128, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Int8: return callOnBasicType<Int8, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::Int16: return callOnBasicType<Int16, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::Int32: return callOnBasicType<Int32, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::Int64: return callOnBasicType<Int64, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::Int128: return callOnBasicType<Int128, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
default:
break;
}
}
if constexpr (_dec)
if constexpr (_decimal)
{
switch (type_num1)
{
case TypeIndex::Decimal32: return callOnBasicType<Decimal32, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Decimal64: return callOnBasicType<Decimal64, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Decimal128: return callOnBasicType<Decimal128, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Decimal32: return callOnBasicType<Decimal32, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::Decimal64: return callOnBasicType<Decimal64, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::Decimal128: return callOnBasicType<Decimal128, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
default:
break;
}
@ -106,8 +117,19 @@ inline bool callOnBasicTypes(TypeIndex type_num1, TypeIndex type_num2, F && f)
{
switch (type_num1)
{
case TypeIndex::Float32: return callOnBasicType<Float32, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Float64: return callOnBasicType<Float64, _int, _float, _dec>(type_num2, std::forward<F>(f));
case TypeIndex::Float32: return callOnBasicType<Float32, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::Float64: return callOnBasicType<Float64, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
default:
break;
}
}
if constexpr (_datetime)
{
switch (type_num1)
{
case TypeIndex::Date: return callOnBasicType<UInt16, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
case TypeIndex::DateTime: return callOnBasicType<UInt32, _int, _float, _decimal, _datetime>(type_num2, std::forward<F>(f));
default:
break;
}

View File

@ -132,7 +132,7 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
is_first = false;
time_t next_row_time = next_cursor->all_columns[time_column_num]->get64(next_cursor->pos);
time_t next_row_time = next_cursor->all_columns[time_column_num]->getUInt(next_cursor->pos);
/// Is new key before rounding.
bool is_new_key = new_path || next_row_time != current_time;

View File

@ -216,7 +216,7 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
if (desc.column_numbers.size() == 1)
{
// Flag row as non-empty if at least one column number if non-zero
current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0;
current_row_is_zero = current_row_is_zero && desc.merged_column->isDefaultAt(desc.merged_column->size() - 1);
}
else
{

View File

@ -172,6 +172,7 @@ static DataTypePtr create(const ASTPtr & arguments)
void registerDataTypeDateTime(DataTypeFactory & factory)
{
factory.registerDataType("DateTime", create, DataTypeFactory::CaseInsensitive);
factory.registerAlias("TIMESTAMP", "DateTime", DataTypeFactory::CaseInsensitive);
}

View File

@ -304,7 +304,7 @@ public:
virtual bool shouldAlignRightInPrettyFormats() const { return false; }
/** Does formatted value in any text format can contain anything but valid UTF8 sequences.
* Example: String (because it can contain arbitary bytes).
* Example: String (because it can contain arbitrary bytes).
* Counterexamples: numbers, Date, DateTime.
* For Enum, it depends.
*/

View File

@ -1,8 +1,10 @@
#pragma once
#include <Core/Types.h>
#include <type_traits>
#include <Core/Types.h>
#include <Common/UInt128.h>
namespace DB
{
@ -146,6 +148,7 @@ template <typename A> struct ResultOfBitNot
* UInt<x>, Int<y> -> Int<max(x*2, y)>
* Float<x>, [U]Int<y> -> Float<max(x, y*2)>
* Decimal<x>, Decimal<y> -> Decimal<max(x,y)>
* UUID, UUID -> UUID
* UInt64 , Int<x> -> Error
* Float<x>, [U]Int64 -> Error
*/
@ -168,7 +171,9 @@ struct ResultOfIf
? max(sizeof(A), sizeof(B)) * 2
: max(sizeof(A), sizeof(B))>::Type;
using Type = std::conditional_t<!IsDecimalNumber<A> && !IsDecimalNumber<B>, ConstructedType,
using ConstructedWithUUID = std::conditional_t<std::is_same_v<A, UInt128> && std::is_same_v<B, UInt128>, A, ConstructedType>;
using Type = std::conditional_t<!IsDecimalNumber<A> && !IsDecimalNumber<B>, ConstructedWithUUID,
std::conditional_t<IsDecimalNumber<A> && IsDecimalNumber<B>, std::conditional_t<(sizeof(A) > sizeof(B)), A, B>, Error>>;
};

View File

@ -1,5 +1,6 @@
#include <iomanip>
#include <Poco/Event.h>
#include <Poco/DirectoryIterator.h>
#include <common/logger_useful.h>
@ -41,7 +42,6 @@ namespace ErrorCodes
static constexpr size_t PRINT_MESSAGE_EACH_N_TABLES = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
static constexpr size_t TABLES_PARALLEL_LOAD_BUNCH_SIZE = 100;
namespace detail
{
@ -149,6 +149,9 @@ void DatabaseOrdinary::loadTables(
ErrorCodes::INCORRECT_FILE_NAME);
}
if (file_names.empty())
return;
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
* which does not correspond to order tables creation and does not correspond to order of their location on disk.
@ -160,15 +163,12 @@ void DatabaseOrdinary::loadTables(
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
Poco::Event all_tables_processed;
auto task_function = [&](FileNames::const_iterator begin, FileNames::const_iterator end)
auto task_function = [&](const String & table)
{
for (auto it = begin; it != end; ++it)
{
const String & table = *it;
/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
@ -176,20 +176,14 @@ void DatabaseOrdinary::loadTables(
}
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
}
if (++tables_processed == total_tables)
all_tables_processed.set();
};
const size_t bunch_size = TABLES_PARALLEL_LOAD_BUNCH_SIZE;
size_t num_bunches = (total_tables + bunch_size - 1) / bunch_size;
for (size_t i = 0; i < num_bunches; ++i)
for (const auto & filename : file_names)
{
auto begin = file_names.begin() + i * bunch_size;
auto end = (i + 1 == num_bunches)
? file_names.end()
: (file_names.begin() + (i + 1) * bunch_size);
auto task = std::bind(task_function, begin, end);
auto task = std::bind(task_function, filename);
if (thread_pool)
thread_pool->schedule(task);
@ -198,7 +192,7 @@ void DatabaseOrdinary::loadTables(
}
if (thread_pool)
thread_pool->wait();
all_tables_processed.wait();
/// After all tables was basically initialized, startup them.
startupTables(thread_pool);
@ -212,47 +206,38 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
size_t total_tables = tables.size();
Poco::Event all_tables_processed;
auto task_function = [&](Tables::iterator begin, Tables::iterator end)
if (!total_tables)
return;
auto task_function = [&](const StoragePtr & table)
{
for (auto it = begin; it != end; ++it)
{
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
it->second->startup();
}
table->startup();
if (++tables_processed == total_tables)
all_tables_processed.set();
};
const size_t bunch_size = TABLES_PARALLEL_LOAD_BUNCH_SIZE;
size_t num_bunches = (total_tables + bunch_size - 1) / bunch_size;
auto begin = tables.begin();
for (size_t i = 0; i < num_bunches; ++i)
for (const auto & name_storage : tables)
{
auto end = begin;
if (i + 1 == num_bunches)
end = tables.end();
else
std::advance(end, bunch_size);
auto task = std::bind(task_function, begin, end);
auto task = std::bind(task_function, name_storage.second);
if (thread_pool)
thread_pool->schedule(task);
else
task();
begin = end;
}
if (thread_pool)
thread_pool->wait();
all_tables_processed.wait();
}

View File

@ -155,7 +155,8 @@ DictionarySourcePtr DictionarySourceFactory::create(
else if ("odbc" == source_type)
{
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".odbc.connection_string"));
const auto & global_config = context.getConfigRef();
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(global_config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".odbc.connection_string"));
return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context, bridge);
#else
throw Exception{"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.",

View File

@ -14,17 +14,28 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
}
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct,
const std::string & db,
const std::string & table,
const std::string & where,
IdentifierQuotingStyle quoting_style)
: dict_struct(dict_struct), db(db), table(table), where(where), quoting_style(quoting_style)
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_)
: dict_struct(dict_struct_), db(db_), where(where_), quoting_style(quoting_style_)
{
if (auto pos = table_.find('.'); pos != std::string::npos)
{
schema = table_.substr(0, pos);
table = table_.substr(pos + 1);
}
else
{
schema = "";
table = table_;
}
}
@ -124,6 +135,11 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
if (!where.empty())
@ -187,6 +203,12 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64>
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);
@ -250,6 +272,12 @@ std::string ExternalQueryBuilder::composeLoadKeysQuery(
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);

View File

@ -18,19 +18,20 @@ class WriteBuffer;
struct ExternalQueryBuilder
{
const DictionaryStructure & dict_struct;
const std::string & db;
const std::string & table;
std::string db;
std::string table;
std::string schema;
const std::string & where;
IdentifierQuotingStyle quoting_style;
ExternalQueryBuilder(
const DictionaryStructure & dict_struct,
const std::string & db,
const std::string & table,
const std::string & where,
IdentifierQuotingStyle quoting_style);
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_);
/** Generate a query to load all data. */
std::string composeLoadAllQuery() const;

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/typeid_cast.h>
@ -20,57 +21,48 @@ void ExternalResultDescription::init(const Block & sample_block_)
{
sample_block = sample_block_;
const auto num_columns = sample_block.columns();
types.reserve(num_columns);
names.reserve(num_columns);
sample_columns.reserve(num_columns);
types.reserve(sample_block.columns());
for (const auto idx : ext::range(0, num_columns))
for (auto & elem : sample_block)
{
const auto & column = sample_block.safeGetByPosition(idx);
const auto type = column.type.get();
/// If default value for column was not provided, use default from data type.
if (elem.column->empty())
elem.column = elem.type->createColumnConstWithDefaultValue(1)->convertToFullColumnIfConst();
bool is_nullable = elem.type->isNullable();
DataTypePtr type_not_nullable = removeNullable(elem.type);
const IDataType * type = type_not_nullable.get();
if (typeid_cast<const DataTypeUInt8 *>(type))
types.push_back(ValueType::UInt8);
types.emplace_back(ValueType::UInt8, is_nullable);
else if (typeid_cast<const DataTypeUInt16 *>(type))
types.push_back(ValueType::UInt16);
types.emplace_back(ValueType::UInt16, is_nullable);
else if (typeid_cast<const DataTypeUInt32 *>(type))
types.push_back(ValueType::UInt32);
types.emplace_back(ValueType::UInt32, is_nullable);
else if (typeid_cast<const DataTypeUInt64 *>(type))
types.push_back(ValueType::UInt64);
types.emplace_back(ValueType::UInt64, is_nullable);
else if (typeid_cast<const DataTypeInt8 *>(type))
types.push_back(ValueType::Int8);
types.emplace_back(ValueType::Int8, is_nullable);
else if (typeid_cast<const DataTypeInt16 *>(type))
types.push_back(ValueType::Int16);
types.emplace_back(ValueType::Int16, is_nullable);
else if (typeid_cast<const DataTypeInt32 *>(type))
types.push_back(ValueType::Int32);
types.emplace_back(ValueType::Int32, is_nullable);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(ValueType::Int64);
types.emplace_back(ValueType::Int64, is_nullable);
else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(ValueType::Float32);
types.emplace_back(ValueType::Float32, is_nullable);
else if (typeid_cast<const DataTypeFloat64 *>(type))
types.push_back(ValueType::Float64);
types.emplace_back(ValueType::Float64, is_nullable);
else if (typeid_cast<const DataTypeString *>(type))
types.push_back(ValueType::String);
types.emplace_back(ValueType::String, is_nullable);
else if (typeid_cast<const DataTypeDate *>(type))
types.push_back(ValueType::Date);
types.emplace_back(ValueType::Date, is_nullable);
else if (typeid_cast<const DataTypeDateTime *>(type))
types.push_back(ValueType::DateTime);
types.emplace_back(ValueType::DateTime, is_nullable);
else if (typeid_cast<const DataTypeUUID *>(type))
types.push_back(ValueType::UUID);
types.emplace_back(ValueType::UUID, is_nullable);
else
throw Exception{"Unsupported type " + type->getName(), ErrorCodes::UNKNOWN_TYPE};
names.emplace_back(column.name);
sample_columns.emplace_back(column.column);
/// If default value for column was not provided, use default from data type.
if (sample_columns.back()->empty())
{
MutableColumnPtr mutable_column = (*std::move(sample_columns.back())).mutate();
column.type->insertDefaultInto(*mutable_column);
sample_columns.back() = std::move(mutable_column);
}
}
}

View File

@ -25,13 +25,11 @@ struct ExternalResultDescription
String,
Date,
DateTime,
UUID
UUID,
};
Block sample_block;
std::vector<ValueType> types;
std::vector<std::string> names;
Columns sample_columns;
std::vector<std::pair<ValueType, bool /* is_nullable */>> types;
void init(const Block & sample_block_);
};

View File

@ -94,11 +94,18 @@ namespace
{
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
continue;
{
/// sample_block contains null_value (from config) inside corresponding column
const auto & col = sample_block.getByPosition(row_n);
columns[row_n]->insertFrom(*(col.column), 0);
}
else
{
const auto & size = field.size;
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
}
}
}
return sample_block.cloneWithColumns(std::move(columns));
}

View File

@ -14,6 +14,7 @@
#include <Dictionaries/MongoDBBlockInputStream.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Common/FieldVisitors.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
@ -179,13 +180,22 @@ Block MongoDBBlockInputStream::readImpl()
for (const auto idx : ext::range(0, size))
{
const auto & name = description.names[idx];
const auto & name = description.sample_block.getByPosition(idx).name;
const Poco::MongoDB::Element::Ptr value = document->get(name);
if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)
insertDefaultValue(*columns[idx], *description.sample_columns[idx]);
insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
else
insertValue(*columns[idx], description.types[idx], *value, name);
{
if (description.types[idx].second)
{
ColumnNullable & column_nullable = static_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[idx], description.types[idx].first, *value, name);
}
}
}

View File

@ -32,7 +32,7 @@ public:
String getName() const override { return "MongoDB"; }
Block getHeader() const override { return description.sample_block; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
Block readImpl() override;

View File

@ -4,6 +4,7 @@
#include <Dictionaries/MySQLBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <ext/range.h>
@ -82,9 +83,18 @@ Block MySQLBlockInputStream::readImpl()
{
const auto value = row[idx];
if (!value.isNull())
insertValue(*columns[idx], description.types[idx], value);
{
if (description.types[idx].second)
{
ColumnNullable & column_nullable = static_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertDefaultValue(*columns[idx], *description.sample_columns[idx]);
insertValue(*columns[idx], description.types[idx].first, value);
}
else
insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
}
++num_rows;

View File

@ -21,7 +21,7 @@ public:
String getName() const override { return "MySQL"; }
Block getHeader() const override { return description.sample_block; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
Block readImpl() override;

View File

@ -2,6 +2,7 @@
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -91,9 +92,18 @@ Block ODBCBlockInputStream::readImpl()
const Poco::Dynamic::Var & value = row[idx];
if (!value.isEmpty())
insertValue(*columns[idx], description.types[idx], value);
{
if (description.types[idx].second)
{
ColumnNullable & column_nullable = static_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertDefaultValue(*columns[idx], *description.sample_columns[idx]);
insertValue(*columns[idx], description.types[idx].first, value);
}
else
insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
}
++iterator;

View File

@ -24,7 +24,7 @@ public:
String getName() const override { return "ODBC"; }
Block getHeader() const override { return description.sample_block; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
Block readImpl() override;

View File

@ -49,7 +49,7 @@ namespace ErrorCodes
* but only by whole bytes. For dates and datetimes - the same as for numbers.
* For example, hex(257) = '0101'.
* unhex(string) - Returns a string, hex of which is equal to `string` with regard of case and discarding one leading zero.
* If such a string does not exist, could return arbitary implementation specific value.
* If such a string does not exist, could return arbitrary implementation specific value.
*
* bitmaskToArray(x) - Returns an array of powers of two in the binary form of x. For example, bitmaskToArray(50) = [2, 16, 32].
*/

View File

@ -725,7 +725,7 @@ private:
return true;
};
if (!callOnBasicTypes<true, false, true>(left_number, right_number, call))
if (!callOnBasicTypes<true, false, true, false>(left_number, right_number, call))
throw Exception("Wrong call for " + getName() + " with " + col_left.type->getName() + " and " + col_right.type->getName(),
ErrorCodes::LOGICAL_ERROR);
}

View File

@ -230,6 +230,11 @@ public:
static FunctionPtr create(const Context &) { return std::make_shared<FunctionIf>(); }
private:
template <typename T0, typename T1>
static constexpr bool allow_arrays =
!IsDecimalNumber<T0> && !IsDecimalNumber<T1> &&
!std::is_same_v<T0, UInt128> && !std::is_same_v<T1, UInt128>;
template <typename T0, typename T1>
static UInt32 decimalScale(Block & block [[maybe_unused]], const ColumnNumbers & arguments [[maybe_unused]])
{
@ -314,7 +319,7 @@ private:
{
if constexpr (std::is_same_v<NumberTraits::Error, typename NumberTraits::ResultOfIf<T0, T1>::Type>)
return false;
else if constexpr (!IsDecimalNumber<T0> && !IsDecimalNumber<T1>)
else if constexpr (allow_arrays<T0, T1>)
{
using ResultType = typename NumberTraits::ResultOfIf<T0, T1>::Type;
@ -370,7 +375,7 @@ private:
{
if constexpr (std::is_same_v<NumberTraits::Error, typename NumberTraits::ResultOfIf<T0, T1>::Type>)
return false;
else if constexpr (!IsDecimalNumber<T0> && !IsDecimalNumber<T1>)
else if constexpr (allow_arrays<T0, T1>)
{
using ResultType = typename NumberTraits::ResultOfIf<T0, T1>::Type;
@ -975,9 +980,10 @@ public:
if (auto rigth_array = checkAndGetDataType<DataTypeArray>(arg_else.type.get()))
right_id = rigth_array->getNestedType()->getTypeId();
bool executed_with_nums = callOnBasicTypes<true, true, true>(left_id, right_id, call);
bool executed_with_nums = callOnBasicTypes<true, true, true, true>(left_id, right_id, call);
if (!( executed_with_nums
|| executeTyped<UInt128, UInt128>(cond_col, block, arguments, result, input_rows_count)
|| executeString(cond_col, block, arguments, result)
|| executeGenericArray(cond_col, block, arguments, result)
|| executeTuple(block, arguments, result, input_rows_count)))

View File

@ -340,7 +340,7 @@ struct ConvertImplGenericToString
ColumnString::Chars_t & data_to = col_to->getChars();
ColumnString::Offsets & offsets_to = col_to->getOffsets();
data_to.resize(size * 2); /// Using coefficient 2 for initial size is arbitary.
data_to.resize(size * 2); /// Using coefficient 2 for initial size is arbitrary.
offsets_to.resize(size);
WriteBufferFromVector<ColumnString::Chars_t> write_buffer(data_to);

View File

@ -565,7 +565,7 @@ public:
vec_to.assign(rows, static_cast<UInt64>(0xe28dbde7fe22e41c));
}
/// The function supports arbitary number of arguments of arbitary types.
/// The function supports arbitrary number of arguments of arbitrary types.
bool is_first_argument = true;
for (size_t i = 0; i < arguments.size(); ++i)

View File

@ -162,7 +162,7 @@ private:
return execute<Type>(block, col_vec, result);
};
if (!callOnBasicType<void, true, true, true>(col.type->getTypeId(), call))
if (!callOnBasicType<void, true, true, true, false>(col.type->getTypeId(), call))
throw Exception{"Illegal column " + col.column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN};
}
@ -385,7 +385,7 @@ private:
TypeIndex left_index = col_left.type->getTypeId();
TypeIndex right_index = col_right.type->getTypeId();
if (!callOnBasicTypes<true, true, false>(left_index, right_index, call))
if (!callOnBasicTypes<true, true, false, false>(left_index, right_index, call))
throw Exception{"Illegal column " + col_left.column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN};
}

View File

@ -397,7 +397,7 @@ struct ArrayIndexStringImpl
}
};
/// Catch-all implementation for arrays of arbitary type.
/// Catch-all implementation for arrays of arbitrary type.
/// To compare with constant value, create non-constant column with single element,
/// and pass is_value_has_single_element_to_compare = true.
template <typename IndexConv, bool is_value_has_single_element_to_compare>
@ -555,7 +555,7 @@ public:
}
};
/// Catch-all implementation for arrays of arbitary type
/// Catch-all implementation for arrays of arbitrary type
/// when the 2nd function argument is a NULL value.
template <typename IndexConv>
struct ArrayIndexGenericNullImpl

View File

@ -85,7 +85,7 @@ public:
unsigned useconds = seconds * (variant == FunctionSleepVariant::PerBlock ? 1 : size) * 1e6;
/// When sleeping, the query cannot be cancelled. For abitily to cancel query, we limit sleep time.
if (useconds > 3000000) /// The choice is arbitary
if (useconds > 3000000) /// The choice is arbitrary
throw Exception("The maximum sleep time is 3000000 microseconds. Requested: " + toString(useconds), ErrorCodes::TOO_SLOW);
::usleep(useconds);

View File

@ -8,7 +8,7 @@
namespace DB
{
using FunctionTimeSlot = FunctionDateOrDateTimeToSomething<DataTypeUInt32, TimeSlotImpl>;
using FunctionTimeSlot = FunctionDateOrDateTimeToSomething<DataTypeDateTime, TimeSlotImpl>;
void registerFunctionTimeSlot(FunctionFactory & factory)
{

View File

@ -121,17 +121,19 @@ public:
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (!WhichDataType(arguments[0]).isDateTime())
throw Exception("Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + ". Must be DateTime.",
if (!WhichDataType(arguments[0].type).isDateTime())
throw Exception("Illegal type " + arguments[0].type->getName() + " of first argument of function " + getName() + ". Must be DateTime.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!WhichDataType(arguments[1]).isUInt32())
throw Exception("Illegal type " + arguments[1]->getName() + " of second argument of function " + getName() + ". Must be UInt32.",
if (!WhichDataType(arguments[1].type).isUInt32())
throw Exception("Illegal type " + arguments[1].type->getName() + " of second argument of function " + getName() + ". Must be UInt32.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>());
/// If time zone is specified for source data type, attach it to the resulting type.
/// Note that there is no explicit time zone argument for this function (we specify 2 as an argument number with explicit time zone).
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 2, 0)));
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override

View File

@ -1132,6 +1132,9 @@ void Aggregator::convertToBlockImpl(
if (data.empty())
return;
if (key_columns.size() != params.keys_size)
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
if (final)
convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns);
else
@ -1151,7 +1154,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
{
for (const auto & value : data)
{
method.insertKeyIntoColumns(value, key_columns, params.keys_size, key_sizes);
method.insertKeyIntoColumns(value, key_columns, key_sizes);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
@ -1169,10 +1172,9 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns) const
{
for (auto & value : data)
{
method.insertKeyIntoColumns(value, key_columns, params.keys_size, key_sizes);
method.insertKeyIntoColumns(value, key_columns, key_sizes);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
@ -2085,7 +2087,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
/** `minus one` means the absence of information about the bucket
* - in the case of single-level aggregation, as well as for blocks with "overflowing" values.
* If there is at least one block with a bucket number greater than zero, then there was a two-level aggregation.
* If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation.
*/
auto max_bucket = bucket_to_blocks.rbegin()->first;
size_t has_two_level = max_bucket >= 0;

View File

@ -166,7 +166,7 @@ struct AggregationMethodOneNumber
/** Insert the key from the hash table into columns.
*/
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
{
static_cast<ColumnVector<FieldType> *>(key_columns[0].get())->insertData(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
@ -243,7 +243,7 @@ struct AggregationMethodString
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
@ -312,7 +312,7 @@ struct AggregationMethodFixedString
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
@ -580,7 +580,7 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
static const bool no_consecutive_keys_optimization = true;
static const bool low_cardinality_optimization = true;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
{
auto ref = Base::getValueRef(value);
static_cast<ColumnLowCardinality *>(key_columns[0].get())->insertData(ref.data, ref.size);
@ -783,8 +783,10 @@ struct AggregationMethodKeysFixed
static const bool no_consecutive_keys_optimization = false;
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes & key_sizes)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & key_sizes)
{
size_t keys_size = key_columns.size();
static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
/// In any hash key value, column values to be read start just after the bitmap, if it exists.
size_t pos = bitmap_size;
@ -891,10 +893,10 @@ struct AggregationMethodSerialized
static const bool no_consecutive_keys_optimization = true;
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes &)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{
auto pos = value.first.data;
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < key_columns.size(); ++i)
pos = key_columns[i]->deserializeAndInsertFromArena(pos);
}
@ -1284,10 +1286,10 @@ public:
Block intermediate_header;
/// What to count.
ColumnNumbers keys;
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
const ColumnNumbers keys;
const AggregateDescriptions aggregates;
const size_t keys_size;
const size_t aggregates_size;
/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
@ -1344,9 +1346,6 @@ public:
{
intermediate_header = intermediate_header_;
}
/// Calculate the column numbers in `keys` and `aggregates`.
void calculateColumnNumbers(const Block & block);
};
Aggregator(const Params & params_);

View File

@ -0,0 +1,130 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Interpreters/Context.h>
#include <Interpreters/QueryNormalizer.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_RESULT_OF_SCALAR_SUBQUERY;
extern const int TOO_MANY_ROWS;
}
static ASTPtr addTypeConversion(std::unique_ptr<ASTLiteral> && ast, const String & type_name)
{
auto func = std::make_shared<ASTFunction>();
ASTPtr res = func;
func->alias = ast->alias;
func->prefer_alias_to_column_name = ast->prefer_alias_to_column_name;
ast->alias.clear();
func->name = "CAST";
auto exp_list = std::make_shared<ASTExpressionList>();
func->arguments = exp_list;
func->children.push_back(func->arguments);
exp_list->children.emplace_back(ast.release());
exp_list->children.emplace_back(std::make_shared<ASTLiteral>(type_name));
return res;
}
void ExecuteScalarSubqueriesVisitor::visit(ASTSubquery * subquery, ASTPtr & ast, const DumpASTNode &) const
{
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
ASTPtr subquery_select = subquery->children.at(0);
BlockIO res = InterpreterSelectWithUnionQuery(
subquery_select, subquery_context, {}, QueryProcessingStage::Complete, subquery_depth + 1).execute();
Block block;
try
{
block = res.in->read();
if (!block)
{
/// Interpret subquery with empty result as Null literal
auto ast_new = std::make_unique<ASTLiteral>(Null());
ast_new->setAlias(ast->tryGetAlias());
ast = std::move(ast_new);
return;
}
if (block.rows() != 1 || res.in->read())
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::TOO_MANY_ROWS)
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
else
throw;
}
size_t columns = block.columns();
if (columns == 1)
{
auto lit = std::make_unique<ASTLiteral>((*block.safeGetByPosition(0).column)[0]);
lit->alias = subquery->alias;
lit->prefer_alias_to_column_name = subquery->prefer_alias_to_column_name;
ast = addTypeConversion(std::move(lit), block.safeGetByPosition(0).type->getName());
}
else
{
auto tuple = std::make_shared<ASTFunction>();
tuple->alias = subquery->alias;
ast = tuple;
tuple->name = "tuple";
auto exp_list = std::make_shared<ASTExpressionList>();
tuple->arguments = exp_list;
tuple->children.push_back(tuple->arguments);
exp_list->children.resize(columns);
for (size_t i = 0; i < columns; ++i)
{
exp_list->children[i] = addTypeConversion(
std::make_unique<ASTLiteral>((*block.safeGetByPosition(i).column)[0]),
block.safeGetByPosition(i).type->getName());
}
}
}
void ExecuteScalarSubqueriesVisitor::visit(ASTTableExpression *, ASTPtr &, const DumpASTNode &) const
{
/// Don't descend into subqueries in FROM section.
}
void ExecuteScalarSubqueriesVisitor::visit(ASTFunction * func, ASTPtr & ast, const DumpASTNode &) const
{
/// Don't descend into subqueries in arguments of IN operator.
/// But if an argument is not subquery, than deeper may be scalar subqueries and we need to descend in them.
if (functionIsInOrGlobalInOperator(func->name))
{
for (auto & child : ast->children)
{
if (child != func->arguments)
visit(child);
else
for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
if (i != 1 || !typeid_cast<ASTSubquery *>(func->arguments->children[i].get()))
visit(func->arguments->children[i]);
}
}
else
visitChildren(ast);
}
}

View File

@ -0,0 +1,79 @@
#pragma once
#include <Common/typeid_cast.h>
#include <Parsers/DumpASTNode.h>
namespace DB
{
class Context;
class ASTSubquery;
class ASTFunction;
struct ASTTableExpression;
/** Replace subqueries that return exactly one row
* ("scalar" subqueries) to the corresponding constants.
*
* If the subquery returns more than one column, it is replaced by a tuple of constants.
*
* Features
*
* A replacement occurs during query analysis, and not during the main runtime.
* This means that the progress indicator will not work during the execution of these requests,
* and also such queries can not be aborted.
*
* But the query result can be used for the index in the table.
*
* Scalar subqueries are executed on the request-initializer server.
* The request is sent to remote servers with already substituted constants.
*/
class ExecuteScalarSubqueriesVisitor
{
public:
ExecuteScalarSubqueriesVisitor(const Context & context_, size_t subquery_depth_, std::ostream * ostr_ = nullptr)
: context(context_),
subquery_depth(subquery_depth_),
visit_depth(0),
ostr(ostr_)
{}
void visit(ASTPtr & ast) const
{
DumpASTNode dump(*ast, ostr, visit_depth, "executeScalarSubqueries");
if (!tryVisit<ASTSubquery>(ast, dump) &&
!tryVisit<ASTTableExpression>(ast, dump) &&
!tryVisit<ASTFunction>(ast, dump))
visitChildren(ast);
}
private:
const Context & context;
size_t subquery_depth;
mutable size_t visit_depth;
std::ostream * ostr;
void visit(ASTSubquery * subquery, ASTPtr & ast, const DumpASTNode & dump) const;
void visit(ASTFunction * func, ASTPtr & ast, const DumpASTNode &) const;
void visit(ASTTableExpression *, ASTPtr &, const DumpASTNode &) const;
void visitChildren(ASTPtr & ast) const
{
for (auto & child : ast->children)
visit(child);
}
template <typename T>
bool tryVisit(ASTPtr & ast, const DumpASTNode & dump) const
{
if (T * t = typeid_cast<T *>(ast.get()))
{
visit(t, ast, dump);
return true;
}
return false;
}
};
}

View File

@ -37,6 +37,7 @@
#include <Interpreters/ProjectionManipulation.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
@ -86,8 +87,6 @@ namespace ErrorCodes
extern const int MULTIPLE_EXPRESSIONS_FOR_ALIAS;
extern const int UNKNOWN_IDENTIFIER;
extern const int CYCLIC_ALIASES;
extern const int INCORRECT_RESULT_OF_SCALAR_SUBQUERY;
extern const int TOO_MANY_ROWS;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int INCORRECT_ELEMENT_OF_SET;
extern const int ALIAS_REQUIRED;
@ -150,16 +149,6 @@ const std::unordered_set<String> possibly_injective_function_names
namespace
{
bool functionIsInOperator(const String & name)
{
return name == "in" || name == "notIn";
}
bool functionIsInOrGlobalInOperator(const String & name)
{
return name == "in" || name == "notIn" || name == "globalIn" || name == "globalNotIn";
}
void removeDuplicateColumns(NamesAndTypesList & columns)
{
std::set<String> names;
@ -904,8 +893,13 @@ void ExpressionAnalyzer::addAliasColumns()
void ExpressionAnalyzer::executeScalarSubqueries()
{
LogAST log;
if (!select_query)
executeScalarSubqueriesImpl(query);
{
ExecuteScalarSubqueriesVisitor execute_scalar_subqueries_visitor(context, subquery_depth, log.stream());
execute_scalar_subqueries_visitor.visit(query);
}
else
{
for (auto & child : query->children)
@ -914,143 +908,14 @@ void ExpressionAnalyzer::executeScalarSubqueries()
if (!typeid_cast<const ASTTableExpression *>(child.get())
&& !typeid_cast<const ASTSelectQuery *>(child.get()))
{
executeScalarSubqueriesImpl(child);
ExecuteScalarSubqueriesVisitor execute_scalar_subqueries_visitor(context, subquery_depth, log.stream());
execute_scalar_subqueries_visitor.visit(child);
}
}
}
}
static ASTPtr addTypeConversion(std::unique_ptr<ASTLiteral> && ast, const String & type_name)
{
auto func = std::make_shared<ASTFunction>();
ASTPtr res = func;
func->alias = ast->alias;
func->prefer_alias_to_column_name = ast->prefer_alias_to_column_name;
ast->alias.clear();
func->name = "CAST";
auto exp_list = std::make_shared<ASTExpressionList>();
func->arguments = exp_list;
func->children.push_back(func->arguments);
exp_list->children.emplace_back(ast.release());
exp_list->children.emplace_back(std::make_shared<ASTLiteral>(type_name));
return res;
}
void ExpressionAnalyzer::executeScalarSubqueriesImpl(ASTPtr & ast)
{
/** Replace subqueries that return exactly one row
* ("scalar" subqueries) to the corresponding constants.
*
* If the subquery returns more than one column, it is replaced by a tuple of constants.
*
* Features
*
* A replacement occurs during query analysis, and not during the main runtime.
* This means that the progress indicator will not work during the execution of these requests,
* and also such queries can not be aborted.
*
* But the query result can be used for the index in the table.
*
* Scalar subqueries are executed on the request-initializer server.
* The request is sent to remote servers with already substituted constants.
*/
if (ASTSubquery * subquery = typeid_cast<ASTSubquery *>(ast.get()))
{
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
ASTPtr subquery_select = subquery->children.at(0);
BlockIO res = InterpreterSelectWithUnionQuery(subquery_select, subquery_context, {}, QueryProcessingStage::Complete, subquery_depth + 1).execute();
Block block;
try
{
block = res.in->read();
if (!block)
{
/// Interpret subquery with empty result as Null literal
auto ast_new = std::make_unique<ASTLiteral>(Null());
ast_new->setAlias(ast->tryGetAlias());
ast = std::move(ast_new);
return;
}
if (block.rows() != 1 || res.in->read())
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::TOO_MANY_ROWS)
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
else
throw;
}
size_t columns = block.columns();
if (columns == 1)
{
auto lit = std::make_unique<ASTLiteral>((*block.safeGetByPosition(0).column)[0]);
lit->alias = subquery->alias;
lit->prefer_alias_to_column_name = subquery->prefer_alias_to_column_name;
ast = addTypeConversion(std::move(lit), block.safeGetByPosition(0).type->getName());
}
else
{
auto tuple = std::make_shared<ASTFunction>();
tuple->alias = subquery->alias;
ast = tuple;
tuple->name = "tuple";
auto exp_list = std::make_shared<ASTExpressionList>();
tuple->arguments = exp_list;
tuple->children.push_back(tuple->arguments);
exp_list->children.resize(columns);
for (size_t i = 0; i < columns; ++i)
{
exp_list->children[i] = addTypeConversion(
std::make_unique<ASTLiteral>((*block.safeGetByPosition(i).column)[0]),
block.safeGetByPosition(i).type->getName());
}
}
}
else
{
/** Don't descend into subqueries in FROM section.
*/
if (!typeid_cast<ASTTableExpression *>(ast.get()))
{
/** Don't descend into subqueries in arguments of IN operator.
* But if an argument is not subquery, than deeper may be scalar subqueries and we need to descend in them.
*/
ASTFunction * func = typeid_cast<ASTFunction *>(ast.get());
if (func && functionIsInOrGlobalInOperator(func->name))
{
for (auto & child : ast->children)
{
if (child != func->arguments)
executeScalarSubqueriesImpl(child);
else
for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
if (i != 1 || !typeid_cast<ASTSubquery *>(func->arguments->children[i].get()))
executeScalarSubqueriesImpl(func->arguments->children[i]);
}
}
else
for (auto & child : ast->children)
executeScalarSubqueriesImpl(child);
}
}
}
void ExpressionAnalyzer::optimizeGroupBy()
{
if (!(select_query && select_query->group_expression_list))

View File

@ -322,8 +322,13 @@ class LLVMPreparedFunction : public PreparedFunctionImpl
public:
LLVMPreparedFunction(std::string name_, std::shared_ptr<LLVMContext> context)
: name(std::move(name_)), context(context), function(context->symbols.at(name))
{}
: name(std::move(name_)), context(context)
{
auto it = context->symbols.find(name);
if (context->symbols.end() == it)
throw Exception("Cannot find symbol " + name + " in LLVMContext", ErrorCodes::LOGICAL_ERROR);
function = it->second;
}
String getName() const override { return name; }
@ -510,6 +515,15 @@ LLVMFunction::LLVMFunction(const ExpressionActions::Actions & actions, std::shar
compileFunctionToLLVMByteCode(context, *this);
}
llvm::Value * LLVMFunction::compile(llvm::IRBuilderBase & builder, ValuePlaceholders values) const
{
auto it = subexpressions.find(name);
if (subexpressions.end() == it)
throw Exception("Cannot find subexpression " + name + " in LLVMFunction", ErrorCodes::LOGICAL_ERROR);
return it->second(builder, values);
}
PreparedFunctionPtr LLVMFunction::prepare(const Block &, const ColumnNumbers &, size_t) const { return std::make_shared<LLVMPreparedFunction>(name, context); }
bool LLVMFunction::isDeterministic() const
@ -691,8 +705,6 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
static LLVMTargetInitializer initializer;
auto dependents = getActionsDependents(actions, output_columns);
/// Initialize context as late as possible and only if needed
std::shared_ptr<LLVMContext> context;
std::vector<ExpressionActions::Actions> fused(actions.size());
for (size_t i = 0; i < actions.size(); ++i)
{
@ -708,7 +720,7 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
auto hash_key = ExpressionActions::ActionsHash{}(fused[i]);
{
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
if (counter[hash_key]++ < min_count_to_compile)
continue;
}
@ -716,26 +728,24 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
std::shared_ptr<LLVMFunction> fn;
if (compilation_cache)
{
/// Lock here, to be sure, that all functions will be compiled
std::lock_guard<std::mutex> lock(mutex);
/// Don't use getOrSet here, because sometimes we need to initialize context
fn = compilation_cache->get(hash_key);
if (!fn)
std::tie(fn, std::ignore) = compilation_cache->getOrSet(hash_key, [&inlined_func=std::as_const(fused[i]), &sample_block] ()
{
if (!context)
context = std::make_shared<LLVMContext>();
Stopwatch watch;
fn = std::make_shared<LLVMFunction>(fused[i], context, sample_block);
std::shared_ptr<LLVMContext> context = std::make_shared<LLVMContext>();
auto result_fn = std::make_shared<LLVMFunction>(inlined_func, context, sample_block);
size_t used_memory = context->compileAllFunctionsToNativeCode();
ProfileEvents::increment(ProfileEvents::CompileExpressionsBytes, used_memory);
ProfileEvents::increment(ProfileEvents::CompileExpressionsMicroseconds, watch.elapsedMicroseconds());
compilation_cache->set(hash_key, fn);
}
return result_fn;
});
}
else
{
if (!context)
context = std::make_shared<LLVMContext>();
std::shared_ptr<LLVMContext> context = std::make_shared<LLVMContext>();
Stopwatch watch;
fn = std::make_shared<LLVMFunction>(fused[i], context, sample_block);
size_t used_memory = context->compileAllFunctionsToNativeCode();
ProfileEvents::increment(ProfileEvents::CompileExpressionsBytes, used_memory);
ProfileEvents::increment(ProfileEvents::CompileExpressionsMicroseconds, watch.elapsedMicroseconds());
}
@ -751,22 +761,12 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
fused[*dep].insert(fused[*dep].end(), fused[i].begin(), fused[i].end());
}
if (context)
{
/// Lock here, because other threads can get uncompilted functions from cache
std::lock_guard<std::mutex> lock(mutex);
size_t used_memory = context->compileAllFunctionsToNativeCode();
ProfileEvents::increment(ProfileEvents::CompileExpressionsBytes, used_memory);
}
for (size_t i = 0; i < actions.size(); ++i)
{
if (actions[i].type == ExpressionAction::APPLY_FUNCTION && actions[i].is_function_compiled)
{
actions[i].function = actions[i].function_base->prepare({}, {}, 0); /// Arguments are not used for LLVMFunction.
}
}
}
}

View File

@ -30,7 +30,7 @@ public:
bool isCompilable() const override { return true; }
llvm::Value * compile(llvm::IRBuilderBase & builder, ValuePlaceholders values) const override { return subexpressions.at(name)(builder, values); }
llvm::Value * compile(llvm::IRBuilderBase & builder, ValuePlaceholders values) const override;
String getName() const override { return name; }

View File

@ -417,6 +417,22 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
res.subqueries_for_sets = query_analyzer->getSubqueriesForSets();
/// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows.
if (res.prewhere_info)
{
auto check_actions = [](const ExpressionActionsPtr & actions)
{
if (actions)
for (const auto & action : actions->getActions())
if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("PREWHERE cannot contain ARRAY JOIN or JOIN action", ErrorCodes::ILLEGAL_PREWHERE);
};
check_actions(res.prewhere_info->prewhere_actions);
check_actions(res.prewhere_info->alias_actions);
check_actions(res.prewhere_info->remove_columns_actions);
}
return res;
}
@ -675,10 +691,10 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
}
}
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, Pipeline & pipeline, const PrewhereInfoPtr & prewhere_info)
{
const Settings & settings = context.getSettingsRef();
/// Actions to calculate ALIAS if required.
@ -806,7 +822,6 @@ void InterpreterSelectQuery::executeFetchColumns(
}
}
/// Limitation on the number of columns to read.
/// It's not applied in 'only_analyze' mode, because the query could be analyzed without removal of unnecessary columns.
if (!only_analyze && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)

View File

@ -21,16 +21,6 @@ namespace ErrorCodes
}
namespace
{
bool functionIsInOrGlobalInOperator(const String & name)
{
return name == "in" || name == "notIn" || name == "globalIn" || name == "globalNotIn";
}
}
QueryNormalizer::QueryNormalizer(ASTPtr & query, const QueryNormalizer::Aliases & aliases,
const Settings & settings, const Names & all_column_names,
const TableNamesAndColumnNames & table_names_and_column_names)

View File

@ -1,5 +1,6 @@
#pragma once
#include <Core/Names.h>
#include <Parsers/IAST.h>
#include <Interpreters/Settings.h>
#include <Interpreters/evaluateQualified.h>
@ -7,6 +8,17 @@
namespace DB
{
inline bool functionIsInOperator(const String & name)
{
return name == "in" || name == "notIn";
}
inline bool functionIsInOrGlobalInOperator(const String & name)
{
return functionIsInOperator(name) || name == "globalIn" || name == "globalNotIn";
}
using TableNameAndColumnNames = std::pair<DatabaseAndTableWithAlias, Names>;
using TableNamesAndColumnNames = std::vector<TableNameAndColumnNames>;

View File

@ -277,7 +277,7 @@ struct Settings
M(SettingBool, log_query_settings, true, "Log query settings into the query_log.") \
M(SettingBool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.") \
M(SettingString, send_logs_level, "none", "Send server text logs with specified minumum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'none'") \
M(SettingBool, enable_optimize_predicate_expression, 1, "If it is set to true, optimize predicates to subqueries.") \
M(SettingBool, enable_optimize_predicate_expression, 0, "If it is set to true, optimize predicates to subqueries.") \
\
M(SettingUInt64, low_cardinality_max_dictionary_size, 8192, "Maximum size (in rows) of shared global dictionary for LowCardinality type.") \
M(SettingBool, low_cardinality_use_single_dictionary_for_part, false, "LowCardinality type serialization setting. If is true, than will use additional keys when global dictionary overflows. Otherwise, will create several shared dictionaries.") \

View File

@ -12,7 +12,7 @@ void ASTLiteral::appendColumnNameImpl(WriteBuffer & ostr) const
/// Special case for very large arrays. Instead of listing all elements, will use hash of them.
/// (Otherwise column name will be too long, that will lead to significant slowdown of expression analysis.)
if (value.getType() == Field::Types::Array
&& value.get<const Array &>().size() > 100) /// 100 - just arbitary value.
&& value.get<const Array &>().size() > 100) /// 100 - just arbitrary value.
{
SipHash hash;
applyVisitor(FieldVisitorHash(hash), value);

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const MergeTreeData & storage,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
UInt64 preferred_block_size_bytes,

View File

@ -18,7 +18,7 @@ class MergeTreeBaseBlockInputStream : public IProfilingBlockInputStream
{
public:
MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const MergeTreeData & storage,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
UInt64 preferred_block_size_bytes,
@ -47,7 +47,7 @@ protected:
void injectVirtualColumns(Block & block) const;
protected:
MergeTreeData & storage;
const MergeTreeData & storage;
PrewhereInfoPtr prewhere_info;

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
MergeTreeBlockInputStream::MergeTreeBlockInputStream(
MergeTreeData & storage_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & owned_data_part_,
size_t max_block_size_rows_,
size_t preferred_block_size_bytes_,

View File

@ -17,7 +17,7 @@ class MergeTreeBlockInputStream : public MergeTreeBaseBlockInputStream
{
public:
MergeTreeBlockInputStream(
MergeTreeData & storage,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & owned_data_part,
size_t max_block_size_rows,
size_t preferred_block_size_bytes,

View File

@ -1223,7 +1223,9 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
* temporary column name ('converting_column_name') created in 'createConvertExpression' method
* will have old name of shared offsets for arrays.
*/
MergedColumnOnlyOutputStream out(*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */, unused_written_offsets);
in.readPrefix();
out.writePrefix();

View File

@ -481,7 +481,7 @@ public:
bool skip_sanity_checks);
/// Should be called if part data is suspected to be corrupted.
void reportBrokenPart(const String & name)
void reportBrokenPart(const String & name) const
{
broken_part_callback(name);
}

View File

@ -22,8 +22,6 @@
#include <Interpreters/MutationsInterpreter.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBufferFromFile.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <Common/SimpleIncrement.h>
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
@ -750,7 +748,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->progress.store(column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed);
BlockInputStreams column_part_streams(parts.size());
NameSet offset_columns_written;
auto it_name_and_type = gathering_columns.cbegin();
@ -767,22 +764,20 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0);
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
column_num < gathering_column_names_size;
++column_num, ++it_name_and_type)
{
const String & column_name = it_name_and_type->name;
const DataTypePtr & column_type = it_name_and_type->type;
const String offset_column_name = Nested::extractTableName(column_name);
Names column_name_{column_name};
Names column_names{column_name};
Float64 progress_before = merge_entry->progress.load(std::memory_order_relaxed);
bool offset_written = offset_columns_written.count(offset_column_name);
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
false, nullptr, true, min_bytes_when_use_direct_io, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
@ -793,7 +788,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, offset_written);
MergedColumnOnlyOutputStream column_to(
data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, false, written_offset_columns);
size_t column_elems_written = 0;
column_to.writePrefix();
@ -811,9 +807,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
", but " + toString(rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR);
}
if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name);
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
merge_entry->columns_written = merging_column_names.size() + column_num;
@ -971,7 +964,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
createHardLink(dir_it.path().toString(), destination.toString());
}
MergedColumnOnlyOutputStream out(data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false, unused_written_offsets);
in->readPrefix();
out.writePrefix();

View File

@ -27,7 +27,7 @@ struct MergeTreeDataPart
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPart(MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_), name(name_), info(info_)
{
}
@ -77,7 +77,7 @@ struct MergeTreeDataPart
bool isEmpty() const { return rows_count == 0; }
MergeTreeData & storage;
const MergeTreeData & storage;
String name;
MergeTreePartInfo info;

View File

@ -67,7 +67,7 @@ namespace ErrorCodes
}
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_)
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_)
: data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
{
}

View File

@ -17,7 +17,7 @@ class KeyCondition;
class MergeTreeDataSelectExecutor
{
public:
MergeTreeDataSelectExecutor(MergeTreeData & data_);
MergeTreeDataSelectExecutor(const MergeTreeData & data_);
/** When reading, selects a set of parts that covers the desired range of the index.
* max_block_number_to_read - if not zero, do not read all the parts whose right border is greater than this threshold.
@ -40,7 +40,7 @@ public:
Int64 max_block_number_to_read) const;
private:
MergeTreeData & data;
const MergeTreeData & data;
Logger * log;

View File

@ -16,7 +16,7 @@ namespace DB
MergeTreeReadPool::MergeTreeReadPool(
const size_t threads, const size_t sum_marks, const size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
RangesInDataParts parts, const MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings, size_t preferred_block_size_bytes,
const bool do_not_steal_tasks)

View File

@ -67,7 +67,7 @@ private:
public:
MergeTreeReadPool(
const size_t threads, const size_t sum_marks, const size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
RangesInDataParts parts, const MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings, size_t preferred_block_size_bytes,
const bool do_not_steal_tasks = false);
@ -91,7 +91,7 @@ private:
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
std::vector<std::shared_lock<std::shared_mutex>> per_part_columns_lock;
MergeTreeData & data;
const MergeTreeData & data;
Names column_names;
Names ordered_names;
bool do_not_steal_tasks;

View File

@ -36,7 +36,7 @@ MergeTreeReader::~MergeTreeReader() = default;
MergeTreeReader::MergeTreeReader(const String & path,
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
UncompressedCache * uncompressed_cache, MarkCache * mark_cache, bool save_marks_in_cache,
MergeTreeData & storage, const MarkRanges & all_mark_ranges,
const MergeTreeData & storage, const MarkRanges & all_mark_ranges,
size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
clockid_t clock_type)

View File

@ -30,7 +30,7 @@ public:
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
bool save_marks_in_cache,
MergeTreeData & storage, const MarkRanges & all_mark_ranges,
const MergeTreeData & storage, const MarkRanges & all_mark_ranges,
size_t aio_threshold, size_t max_read_buffer_size,
const ValueSizeMap & avg_value_size_hints = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback = ReadBufferFromFileBase::ProfileCallback{},
@ -111,7 +111,7 @@ private:
/// If save_marks_in_cache is false, then, if marks are not in cache, we will load them but won't save in the cache, to avoid evicting other data.
bool save_marks_in_cache;
MergeTreeData & storage;
const MergeTreeData & storage;
MarkRanges all_mark_ranges;
size_t aio_threshold;
size_t max_read_buffer_size;

View File

@ -14,7 +14,7 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
const size_t max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const MergeTreeData & storage,
const bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const Settings & settings,

View File

@ -21,7 +21,7 @@ public:
const size_t max_block_size,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const MergeTreeData & storage,
const bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const Settings & settings,

View File

@ -19,6 +19,7 @@ constexpr auto MARKS_FILE_EXTENSION = ".mrk";
}
/// Implementation of IMergedBlockOutputStream.
IMergedBlockOutputStream::IMergedBlockOutputStream(
@ -70,7 +71,7 @@ void IMergedBlockOutputStream::addStreams(
IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
const String & name, OffsetColumns & offset_columns, bool skip_offsets)
const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets)
{
return [&, skip_offsets] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
@ -93,7 +94,7 @@ void IMergedBlockOutputStream::writeData(
const String & name,
const IDataType & type,
const IColumn & column,
OffsetColumns & offset_columns,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state)
{
@ -304,7 +305,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
@ -395,7 +396,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
size_t rows = block.rows();
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns;
auto sort_columns = storage.getPrimarySortColumns();
@ -427,7 +428,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
if (serialization_states.empty())
{
serialization_states.reserve(columns_list.size());
OffsetColumns tmp_offset_columns;
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (const auto & col : columns_list)
@ -501,12 +502,15 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// Implementation of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_)
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionSettings compression_settings, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io),
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_)
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_),
already_written_offset_columns(already_written_offset_columns)
{
}
@ -517,7 +521,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
column_streams.clear();
serialization_states.clear();
serialization_states.reserve(block.columns());
OffsetColumns tmp_offset_columns;
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (size_t i = 0; i < block.columns(); ++i)
@ -535,7 +539,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
size_t rows = block.rows();
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
@ -558,11 +562,11 @@ MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndG
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
for (size_t i = 0; i < header.columns(); ++i)
for (size_t i = 0, size = header.columns(); i < size; ++i)
{
auto & column = header.safeGetByPosition(i);
serialize_settings.getter = createStreamGetter(column.name, offset_columns, skip_offsets);
auto & column = header.getByPosition(i);
serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}

View File

@ -23,8 +23,9 @@ public:
CompressionSettings compression_settings_,
size_t aio_threshold_);
using WrittenOffsetColumns = std::set<std::string>;
protected:
using OffsetColumns = std::set<std::string>;
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
@ -67,10 +68,10 @@ protected:
void addStreams(const String & path, const String & name, const IDataType & type, size_t estimated_size, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, OffsetColumns & offset_columns, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
/// Write data of one column.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns,
void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenOffsetColumns & offset_columns,
bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state);
MergeTreeData & storage;
@ -150,13 +151,17 @@ private:
};
/// Writes only those columns that are in `block`
/// Writes only those columns that are in `header`
class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{
public:
/// skip_offsets: used when ALTERing columns if we know that array offsets are not altered.
/// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_);
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionSettings compression_settings, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns);
Block getHeader() const override { return header; }
void write(const Block & block) override;
@ -171,6 +176,9 @@ private:
bool initialized = false;
bool sync;
bool skip_offsets;
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns & already_written_offset_columns;
};
}

View File

@ -15,6 +15,7 @@
#include <Databases/IDatabase.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTLiteral.h>
@ -3826,6 +3827,11 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
optimize->database = leader_address.database;
optimize->table = leader_address.table;
}
else if (auto * drop = typeid_cast<ASTDropQuery *>(new_query.get()); drop->kind == ASTDropQuery::Kind::Truncate)
{
drop->database = leader_address.database;
drop->table = leader_address.table;
}
else
throw Exception("Can't proxy this query. Unsupported query type", ErrorCodes::NOT_IMPLEMENTED);

View File

@ -17,7 +17,7 @@ class Context;
* You could also specify a limit (how many numbers to give).
* If multithreaded is specified, numbers will be generated in several streams
* (and result could be out of order). If both multithreaded and limit are specified,
* the table could give you not exactly 1..limit range, but some arbitary 'limit' numbers.
* the table could give you not exactly 1..limit range, but some arbitrary 'limit' numbers.
*/
class StorageSystemNumbers : public ext::shared_ptr_helper<StorageSystemNumbers>, public IStorage
{

View File

@ -0,0 +1,6 @@
# Quick syntax check (2 minutes on 16-core server)
mkdir build && cd build
CC=clang-8 CXX=clang++-8 cmake -D ENABLE_EMBEDDED_COMPILER=0 -D CMAKE_BUILD_TYPE=Debug ..
ninja re2_st
time jq --raw-output '.[] | .command' compile_commands.json | grep -P -- ' -o [^ ]+\.o' | grep -v -P -- '-c .+/contrib/' | grep -vP '\.s$' | sed -r -e 's/ -o [^ ]+\.o/ -fsyntax-only/' | sort -R | xargs -I{} -P$(nproc) sh -c '{}'

View File

@ -14,9 +14,9 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
(You must close all your sessions (for example, restart your computer))

View File

@ -13,6 +13,7 @@ import pymysql
import xml.dom.minidom
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
import psycopg2
import docker
from docker.errors import ContainerError
@ -79,6 +80,7 @@ class ClickHouseCluster:
self.instances = {}
self.with_zookeeper = False
self.with_mysql = False
self.with_postgres = False
self.with_kafka = False
self.with_odbc_drivers = False
@ -92,7 +94,7 @@ class ClickHouseCluster:
cmd += " client"
return cmd
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
@ -127,6 +129,12 @@ class ClickHouseCluster:
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
if with_postgres and not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
if with_odbc_drivers and not self.with_odbc_drivers:
self.with_odbc_drivers = True
if not self.with_mysql:
@ -134,6 +142,12 @@ class ClickHouseCluster:
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
if not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
if with_kafka and not self.with_kafka:
self.with_kafka = True
@ -168,6 +182,21 @@ class ClickHouseCluster:
raise Exception("Cannot wait MySQL container")
def wait_postgres_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.close()
print "Postgres Started"
return
except Exception as ex:
print "Can't connect to Postgres " + str(ex)
time.sleep(0.5)
raise Exception("Cannot wait Postgres container")
def wait_zookeeper_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
@ -204,20 +233,24 @@ class ClickHouseCluster:
self.docker_client = docker.from_env(version=self.docker_api_version)
if self.with_zookeeper and self.base_zookeeper_cmd:
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate'])
for command in self.pre_zookeeper_commands:
self.run_kazoo_commands_with_retries(command, repeats=5)
self.wait_zookeeper_to_start(120)
if self.with_mysql and self.base_mysql_cmd:
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate'])
self.wait_mysql_to_start(120)
if self.with_postgres and self.base_postgres_cmd:
subprocess_check_call(self.base_postgres_cmd + ['up', '-d', '--force-recreate'])
self.wait_postgres_to_start(120)
if self.with_kafka and self.base_kafka_cmd:
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate'])
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
subprocess_check_call(self.base_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_cmd + ['up', '-d', '--force-recreate'])
start_deadline = time.time() + 20.0 # seconds
for instance in self.instances.itervalues():
@ -281,7 +314,7 @@ services:
- {logs_dir}:/var/log/clickhouse-server/
{odbc_ini_path}
entrypoint:
- /usr/bin/clickhouse
- clickhouse
- server
- --config-file=/etc/clickhouse-server/config.xml
- --log-file=/var/log/clickhouse-server/clickhouse-server.log
@ -444,8 +477,18 @@ class ClickHouseInstance:
},
"PostgreSQL": {
"DSN": "postgresql_odbc",
"Database": "postgres",
"UserName": "postgres",
"Password": "mysecretpassword",
"Port": "5432",
"Servername": "postgres1",
"Protocol": "9.3",
"ReadOnly": "No",
"RowVersioning": "No",
"ShowSystemTables": "No",
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
"Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
"ConnSettings": "",
}
}
else:

View File

@ -0,0 +1,9 @@
version: '2'
services:
postgres1:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: mysecretpassword
ports:
- 5432:5432

View File

@ -0,0 +1,38 @@
<dictionaries>
<dictionary>
<name>postgres_odbc_hashed</name>
<source>
<odbc>
<table>clickhouse.test_table</table>
<connection_string>DSN=postgresql_odbc;</connection_string>
<db>postgres</db>
</odbc>
</source>
<lifetime>
<min>5</min>
<max>5</max>
</lifetime>
<layout>
<hashed />
</layout>
<structure>
<id>
<name>column1</name>
</id>
<attribute>
<name>column1</name>
<type>Int64</type>
<null_value>1</null_value>
</attribute>
<attribute>
<name>column2</name>
<type>String</type>
<null_value>''</null_value>
</attribute>
</structure>
</dictionary>
</dictionaries>

View File

@ -3,12 +3,14 @@ import pytest
import os
import pymysql.cursors
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml'])
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml', 'configs/dictionaries/postgres_odbc_hashed_dictionary.xml'])
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
@ -31,24 +33,49 @@ def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(table_name))
def get_postgres_conn():
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE SCHEMA {}".format(name))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
print "sqlite data received"
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t1(x INTEGER PRIMARY KEY ASC, y, z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t2(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t3(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t4(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
conn = get_mysql_conn()
print "sqlite tables created"
mysql_conn = get_mysql_conn()
print "mysql connection received"
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
create_mysql_db(mysql_conn, 'clickhouse')
print "mysql database created"
postgres_conn = get_postgres_conn()
print "postgres connection received"
create_postgres_db(postgres_conn, 'clickhouse')
print "postgres db created"
cursor = postgres_conn.cursor()
cursor.execute("create table if not exists clickhouse.test_table (column1 int primary key, column2 varchar(40) not null)")
yield cluster
except Exception as ex:
print(ex)
raise ex
finally:
cluster.shutdown()
@ -141,3 +168,11 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "12\n"
def test_postgres_odbc_hached_dictionary_with_schema(started_cluster):
conn = get_postgres_conn()
cursor = conn.cursor()
cursor.execute("insert into clickhouse.test_table values(1, 'hello'),(2, 'world')")
time.sleep(5)
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))") == "hello\n"
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))") == "world\n"

View File

@ -6,6 +6,7 @@ hello
hello
hello
hello
1970-01-01 00:00:01
CREATE TABLE test.cast ( x UInt8, e Enum8('hello' = 1, 'world' = 2) DEFAULT CAST(x, 'Enum8(\'hello\' = 1, \'world\' = 2)')) ENGINE = MergeTree ORDER BY e SETTINGS index_granularity = 8192
x UInt8
e Enum8(\'hello\' = 1, \'world\' = 2) DEFAULT CAST(x, \'Enum8(\\\'hello\\\' = 1, \\\'world\\\' = 2)\')

View File

@ -15,6 +15,8 @@ SELECT cast(1 AS Enum8(
SELECT CAST(1, 'Enum8(\'hello\' = 1,\n\t\'world\' = 2)');
SELECT cast(1, 'Enum8(\'hello\' = 1,\n\t\'world\' = 2)');
SELECT toTimeZone(CAST(1 AS TIMESTAMP), 'UTC');
DROP TABLE IF EXISTS test.cast;
CREATE TABLE test.cast
(

View File

@ -0,0 +1,17 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.Issue_2231_Invalid_Nested_Columns_Size;
CREATE TABLE test.Issue_2231_Invalid_Nested_Columns_Size (
Date Date,
NestedColumn Nested(
ID Int32,
Count Int64
)
) Engine = MergeTree
PARTITION BY tuple()
ORDER BY Date;
INSERT INTO test.Issue_2231_Invalid_Nested_Columns_Size VALUES (today(), [2,2], [1]), (today(), [2,2], [1, 1]); -- { serverError 190 }
SELECT * FROM test.Issue_2231_Invalid_Nested_Columns_Size;
DROP TABLE test.Issue_2231_Invalid_Nested_Columns_Size;

View File

@ -0,0 +1,8 @@
background 1
foreground 1
heading 1
image 1
background 1
foreground 1
heading 1
image 1

View File

@ -0,0 +1,33 @@
SET send_logs_level = 'none';
USE test;
drop table if exists t1;
create table t1 (id UInt64, val Array(String),nid UInt64, eDate Date)ENGINE = MergeTree(eDate, (id, eDate), 8192);
insert into t1 (id,val,nid,eDate) values (1,['background','foreground','heading','image'],1,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (1,['background','foreground','heading','image'],1,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (2,['background','foreground','heading','image'],1,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (2,[],2,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (3,[],4,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (3,[],5,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (3,[],6,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (3,[],7,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (3,[],8,'2018-09-27');
select arrayJoin(val) as nameGroup6 from t1 prewhere notEmpty(toString(nameGroup6)) group by nameGroup6 order by nameGroup6; -- { serverError 182 }
select arrayJoin(val) as nameGroup6, countDistinct(nid) as rowids from t1 where notEmpty(toString(nameGroup6)) group by nameGroup6 order by nameGroup6;
select arrayJoin(val) as nameGroup6, countDistinct(nid) as rowids from t1 prewhere notEmpty(toString(nameGroup6)) group by nameGroup6 order by nameGroup6; -- { serverError 182 }
drop table t1;
create table t1 (id UInt64, val Array(String),nid UInt64, eDate Date) ENGINE = MergeTree(eDate, (id, eDate), 8192);
insert into t1 (id,val,nid,eDate) values (1,['background','foreground','heading','image'],1,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (1,['background','foreground','heading','image'],1,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (2,['background','foreground','heading','image'],1,'2018-09-27');
insert into t1 (id,val,nid,eDate) values (2,[],2,'2018-09-27');
select arrayJoin(val) as nameGroup6 from t1 prewhere notEmpty(toString(nameGroup6)) group by nameGroup6 order by nameGroup6; -- { serverError 182 }
select arrayJoin(val) as nameGroup6, countDistinct(nid) as rowids from t1 where notEmpty(toString(nameGroup6)) group by nameGroup6 order by nameGroup6;
select arrayJoin(val) as nameGroup6, countDistinct(nid) as rowids from t1 prewhere notEmpty(toString(nameGroup6)) group by nameGroup6 order by nameGroup6; -- { serverError 182 }
drop table t1;

View File

@ -0,0 +1,2 @@
2001-01-01 2.0000 0.00000000 -2.0000000000
2001-01-01 0.0000 1.00000000 0.0000000000

View File

@ -0,0 +1,27 @@
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.decimal_sum;
CREATE TABLE test.decimal_sum
(
date Date,
sum32 Decimal32(4),
sum64 Decimal64(8),
sum128 Decimal128(10)
) Engine = SummingMergeTree(date, (date), 8192);
INSERT INTO test.decimal_sum VALUES ('2001-01-01', 1, 1, -1);
INSERT INTO test.decimal_sum VALUES ('2001-01-01', 1, -1, -1);
OPTIMIZE TABLE test.decimal_sum;
SELECT * FROM test.decimal_sum;
INSERT INTO test.decimal_sum VALUES ('2001-01-01', -2, 1, 2);
OPTIMIZE TABLE test.decimal_sum;
SELECT * FROM test.decimal_sum;
INSERT INTO test.decimal_sum VALUES ('2001-01-01', 0, -1, 0);
OPTIMIZE TABLE test.decimal_sum;
SELECT * FROM test.decimal_sum;
drop table test.decimal_sum;

View File

@ -0,0 +1,16 @@
2001-02-03 04:05:06
2000-01-01 00:00:00
2001-02-03 04:05:06
2000-01-01 00:00:00
2001-02-03 04:05:06
2000-01-01 00:00:00
2001-02-03 04:05:06
2000-01-01 00:00:00
2001-02-03
2000-01-01
2001-02-03
2000-01-01
2001-02-03
2000-01-01
2001-02-03
2000-01-01

Some files were not shown because too many files have changed in this diff Show More