Merge branch 'master' into table-constraints

This commit is contained in:
Gleb Novikov 2019-05-19 09:03:04 +03:00
commit 2e7b6af137
131 changed files with 4284 additions and 1359 deletions

3
.gitmodules vendored
View File

@ -82,3 +82,6 @@
[submodule "contrib/simdjson"] [submodule "contrib/simdjson"]
path = contrib/simdjson path = contrib/simdjson
url = https://github.com/lemire/simdjson.git url = https://github.com/lemire/simdjson.git
[submodule "contrib/rapidjson"]
path = contrib/rapidjson
url = https://github.com/Tencent/rapidjson

View File

@ -328,6 +328,7 @@ include (cmake/find_base64.cmake)
include (cmake/find_hyperscan.cmake) include (cmake/find_hyperscan.cmake)
include (cmake/find_lfalloc.cmake) include (cmake/find_lfalloc.cmake)
include (cmake/find_simdjson.cmake) include (cmake/find_simdjson.cmake)
include (cmake/find_rapidjson.cmake)
find_contrib_lib(cityhash) find_contrib_lib(cityhash)
find_contrib_lib(farmhash) find_contrib_lib(farmhash)
find_contrib_lib(metrohash) find_contrib_lib(metrohash)

View File

@ -0,0 +1,9 @@
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/rapidjson/include/rapidjson/rapidjson.h")
message (WARNING "submodule contrib/rapidjson is missing. to fix try run: \n git submodule update --init --recursive")
return()
endif ()
option (USE_RAPIDJSON "Use rapidjson" ON)
set (RAPIDJSON_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rapidjson/include")
message(STATUS "Using rapidjson=${USE_RAPIDJSON}: ${RAPIDJSON_INCLUDE_DIR}")

1
contrib/rapidjson vendored Submodule

@ -0,0 +1 @@
Subproject commit 01950eb7acec78818d68b762efc869bba2420d82

2
contrib/simdjson vendored

@ -1 +1 @@
Subproject commit 681cd3369860f4eada49a387cbff93030f759c95 Subproject commit 14cd1f7a0b0563db78bda8053a9f6ac2ea95a441

View File

@ -367,6 +367,6 @@ if (ENABLE_TESTS AND USE_GTEST)
# attach all dbms gtest sources # attach all dbms gtest sources
grep_gtest_sources(${ClickHouse_SOURCE_DIR}/dbms dbms_gtest_sources) grep_gtest_sources(${ClickHouse_SOURCE_DIR}/dbms dbms_gtest_sources)
add_executable(unit_tests_dbms ${dbms_gtest_sources}) add_executable(unit_tests_dbms ${dbms_gtest_sources})
target_link_libraries(unit_tests_dbms PRIVATE ${GTEST_BOTH_LIBRARIES} dbms clickhouse_common_zookeeper) target_link_libraries(unit_tests_dbms PRIVATE ${GTEST_BOTH_LIBRARIES} clickhouse_functions clickhouse_parsers dbms clickhouse_common_zookeeper)
add_check(unit_tests_dbms) add_check(unit_tests_dbms)
endif () endif ()

View File

@ -43,8 +43,12 @@ template <typename Value, bool FloatReturn> using FuncQuantilesTDigestWeighted =
template <template <typename, bool> class Function> template <template <typename, bool> class Function>
static constexpr bool supportDecimal() static constexpr bool supportDecimal()
{ {
return std::is_same_v<Function<Float32, false>, FuncQuantileExact<Float32, false>> || return std::is_same_v<Function<Float32, false>, FuncQuantile<Float32, false>> ||
std::is_same_v<Function<Float32, false>, FuncQuantilesExact<Float32, false>>; std::is_same_v<Function<Float32, false>, FuncQuantiles<Float32, false>> ||
std::is_same_v<Function<Float32, false>, FuncQuantileExact<Float32, false>> ||
std::is_same_v<Function<Float32, false>, FuncQuantilesExact<Float32, false>> ||
std::is_same_v<Function<Float32, false>, FuncQuantileExactWeighted<Float32, false>> ||
std::is_same_v<Function<Float32, false>, FuncQuantilesExactWeighted<Float32, false>>;
} }
@ -66,9 +70,9 @@ AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, c
if constexpr (supportDecimal<Function>()) if constexpr (supportDecimal<Function>())
{ {
if (which.idx == TypeIndex::Decimal32) return std::make_shared<Function<Decimal32, true>>(argument_type, params); if (which.idx == TypeIndex::Decimal32) return std::make_shared<Function<Decimal32, false>>(argument_type, params);
if (which.idx == TypeIndex::Decimal64) return std::make_shared<Function<Decimal64, true>>(argument_type, params); if (which.idx == TypeIndex::Decimal64) return std::make_shared<Function<Decimal64, false>>(argument_type, params);
if (which.idx == TypeIndex::Decimal128) return std::make_shared<Function<Decimal128, true>>(argument_type, params); if (which.idx == TypeIndex::Decimal128) return std::make_shared<Function<Decimal128, false>>(argument_type, params);
} }
throw Exception("Illegal type " + argument_type->getName() + " of argument for aggregate function " + name, throw Exception("Illegal type " + argument_type->getName() + " of argument for aggregate function " + name,

View File

@ -2,6 +2,10 @@
#include <AggregateFunctions/AggregateFunctionWindowFunnel.h> #include <AggregateFunctions/AggregateFunctionWindowFunnel.h>
#include <AggregateFunctions/Helpers.h> #include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h> #include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <ext/range.h>
namespace DB namespace DB
@ -10,6 +14,7 @@ namespace DB
namespace namespace
{ {
template <template <typename> class Data>
AggregateFunctionPtr createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes & arguments, const Array & params) AggregateFunctionPtr createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes & arguments, const Array & params)
{ {
if (params.size() != 1) if (params.size() != 1)
@ -18,17 +23,36 @@ AggregateFunctionPtr createAggregateFunctionWindowFunnel(const std::string & nam
if (arguments.size() < 2) if (arguments.size() < 2)
throw Exception("Aggregate function " + name + " requires one timestamp argument and at least one event condition.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception("Aggregate function " + name + " requires one timestamp argument and at least one event condition.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() > AggregateFunctionWindowFunnelData::max_events + 1) if (arguments.size() > max_events + 1)
throw Exception("Too many event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception("Too many event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionWindowFunnel>(arguments, params); for (const auto i : ext::range(1, arguments.size()))
{
auto cond_arg = arguments[i].get();
if (!isUInt8(cond_arg))
throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) + " of aggregate function "
+ name + ", must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
AggregateFunctionPtr res(createWithUnsignedIntegerType<AggregateFunctionWindowFunnel, Data>(*arguments[0], arguments, params));
WhichDataType which(arguments.front().get());
if (res)
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionWindowFunnel<DataTypeDate::FieldType, Data<DataTypeDate::FieldType>>>(arguments, params);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionWindowFunnel<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType>>>(arguments, params);
throw Exception{"Illegal type " + arguments.front().get()->getName()
+ " of first argument of aggregate function " + name + ", must be Unsigned Number, Date, DateTime",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
} }
} }
void registerAggregateFunctionWindowFunnel(AggregateFunctionFactory & factory) void registerAggregateFunctionWindowFunnel(AggregateFunctionFactory & factory)
{ {
factory.registerFunction("windowFunnel", createAggregateFunctionWindowFunnel, AggregateFunctionFactory::CaseInsensitive); factory.registerFunction("windowFunnel", createAggregateFunctionWindowFunnel<AggregateFunctionWindowFunnelData>, AggregateFunctionFactory::CaseInsensitive);
} }
} }

View File

@ -9,18 +9,15 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Common/ArenaAllocator.h> #include <Common/ArenaAllocator.h>
#include <ext/range.h>
#include <AggregateFunctions/IAggregateFunction.h> #include <AggregateFunctions/IAggregateFunction.h>
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_TYPE_OF_ARGUMENT;
} }
@ -33,10 +30,11 @@ struct ComparePairFirst final
} }
}; };
static constexpr auto max_events = 32;
template <typename T>
struct AggregateFunctionWindowFunnelData struct AggregateFunctionWindowFunnelData
{ {
static constexpr auto max_events = 32; using TimestampEvent = std::pair<T, UInt8>;
using TimestampEvent = std::pair<UInt32, UInt8>;
static constexpr size_t bytes_on_stack = 64; static constexpr size_t bytes_on_stack = 64;
using TimestampEvents = PODArray<TimestampEvent, bytes_on_stack, AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>>; using TimestampEvents = PODArray<TimestampEvent, bytes_on_stack, AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>>;
@ -51,7 +49,7 @@ struct AggregateFunctionWindowFunnelData
return events_list.size(); return events_list.size();
} }
void add(UInt32 timestamp, UInt8 event) void add(T timestamp, UInt8 event)
{ {
// Since most events should have already been sorted by timestamp. // Since most events should have already been sorted by timestamp.
if (sorted && events_list.size() > 0 && events_list.back().first > timestamp) if (sorted && events_list.size() > 0 && events_list.back().first > timestamp)
@ -119,7 +117,7 @@ struct AggregateFunctionWindowFunnelData
events_list.clear(); events_list.clear();
events_list.reserve(size); events_list.reserve(size);
UInt32 timestamp; T timestamp;
UInt8 event; UInt8 event;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
@ -137,11 +135,12 @@ struct AggregateFunctionWindowFunnelData
* Usage: * Usage:
* - windowFunnel(window)(timestamp, cond1, cond2, cond3, ....) * - windowFunnel(window)(timestamp, cond1, cond2, cond3, ....)
*/ */
template <typename T, typename Data>
class AggregateFunctionWindowFunnel final class AggregateFunctionWindowFunnel final
: public IAggregateFunctionDataHelper<AggregateFunctionWindowFunnelData, AggregateFunctionWindowFunnel> : public IAggregateFunctionDataHelper<Data, AggregateFunctionWindowFunnel<T, Data>>
{ {
private: private:
UInt32 window; UInt64 window;
UInt8 events_size; UInt8 events_size;
@ -149,22 +148,24 @@ private:
// The level path must be 1---2---3---...---check_events_size, find the max event level that statisfied the path in the sliding window. // The level path must be 1---2---3---...---check_events_size, find the max event level that statisfied the path in the sliding window.
// If found, returns the max event level, else return 0. // If found, returns the max event level, else return 0.
// The Algorithm complexity is O(n). // The Algorithm complexity is O(n).
UInt8 getEventLevel(const AggregateFunctionWindowFunnelData & data) const UInt8 getEventLevel(const Data & data) const
{ {
if (data.size() == 0) if (data.size() == 0)
return 0; return 0;
if (events_size == 1) if (events_size == 1)
return 1; return 1;
const_cast<AggregateFunctionWindowFunnelData &>(data).sort(); const_cast<Data &>(data).sort();
// events_timestamp stores the timestamp that latest i-th level event happen withing time window after previous level event. /// events_timestamp stores the timestamp that latest i-th level event happen withing time window after previous level event.
// timestamp defaults to -1, which unsigned timestamp value never meet /// timestamp defaults to -1, which unsigned timestamp value never meet
std::vector<Int32> events_timestamp(events_size, -1); /// there may be some bugs when UInt64 type timstamp overflows Int64, but it works on most cases.
std::vector<Int64> events_timestamp(events_size, -1);
for (const auto & pair : data.events_list) for (const auto & pair : data.events_list)
{ {
const auto & timestamp = pair.first; const T & timestamp = pair.first;
const auto & event_idx = pair.second - 1; const auto & event_idx = pair.second - 1;
if (event_idx == 0) if (event_idx == 0)
events_timestamp[0] = timestamp; events_timestamp[0] = timestamp;
else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window) else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window)
@ -189,22 +190,8 @@ public:
} }
AggregateFunctionWindowFunnel(const DataTypes & arguments, const Array & params) AggregateFunctionWindowFunnel(const DataTypes & arguments, const Array & params)
: IAggregateFunctionDataHelper<AggregateFunctionWindowFunnelData, AggregateFunctionWindowFunnel>(arguments, params) : IAggregateFunctionDataHelper<Data, AggregateFunctionWindowFunnel<T, Data>>(arguments, params)
{ {
const auto time_arg = arguments.front().get();
if (!WhichDataType(time_arg).isDateTime() && !WhichDataType(time_arg).isUInt32())
throw Exception{"Illegal type " + time_arg->getName() + " of first argument of aggregate function " + getName()
+ ", must be DateTime or UInt32", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
for (const auto i : ext::range(1, arguments.size()))
{
auto cond_arg = arguments[i].get();
if (!isUInt8(cond_arg))
throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) + " of aggregate function "
+ getName() + ", must be UInt8",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
events_size = arguments.size() - 1; events_size = arguments.size() - 1;
window = params.at(0).safeGet<UInt64>(); window = params.at(0).safeGet<UInt64>();
} }
@ -217,7 +204,7 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
{ {
const auto timestamp = static_cast<const ColumnVector<UInt32> *>(columns[0])->getData()[row_num]; const auto timestamp = static_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
// reverse iteration and stable sorting are needed for events that are qualified by more than one condition. // reverse iteration and stable sorting are needed for events that are qualified by more than one condition.
for (auto i = events_size; i > 0; --i) for (auto i = events_size; i > 0; --i)
{ {

View File

@ -20,12 +20,22 @@ namespace ErrorCodes
template <typename Value> template <typename Value>
struct QuantileExactWeighted struct QuantileExactWeighted
{ {
struct Int128Hash
{
size_t operator()(Int128 x) const
{
return CityHash_v1_0_2::Hash128to64({x >> 64, x & 0xffffffffffffffffll});
}
};
using Weight = UInt64; using Weight = UInt64;
using UnderlyingType = typename NativeType<Value>::Type;
using Hasher = std::conditional_t<std::is_same_v<Value, Decimal128>, Int128Hash, HashCRC32<UnderlyingType>>;
/// When creating, the hash table must be small. /// When creating, the hash table must be small.
using Map = HashMap< using Map = HashMap<
Value, Weight, UnderlyingType, Weight,
HashCRC32<Value>, Hasher,
HashTableGrower<4>, HashTableGrower<4>,
HashTableAllocatorWithStackMemory<sizeof(std::pair<Value, Weight>) * (1 << 3)> HashTableAllocatorWithStackMemory<sizeof(std::pair<Value, Weight>) * (1 << 3)>
>; >;
@ -39,7 +49,7 @@ struct QuantileExactWeighted
++map[x]; ++map[x];
} }
void add(const Value & x, const Weight & weight) void add(const Value & x, Weight weight)
{ {
if (!isNaN(x)) if (!isNaN(x))
map[x] += weight; map[x] += weight;

View File

@ -12,8 +12,7 @@
#include <Core/Protocol.h> #include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlockStreamProfileInfo.h> #include <DataStreams/BlockStreamProfileInfo.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>

View File

@ -1430,6 +1430,8 @@ void ZooKeeper::pushRequest(RequestInfo && info)
if (!info.request->xid) if (!info.request->xid)
{ {
info.request->xid = next_xid.fetch_add(1); info.request->xid = next_xid.fetch_add(1);
if (info.request->xid == close_xid)
throw Exception("xid equal to close_xid", ZSESSIONEXPIRED);
if (info.request->xid < 0) if (info.request->xid < 0)
throw Exception("XID overflow", ZSESSIONEXPIRED); throw Exception("XID overflow", ZSESSIONEXPIRED);
} }

View File

@ -26,6 +26,7 @@
#cmakedefine01 USE_SSL #cmakedefine01 USE_SSL
#cmakedefine01 USE_HYPERSCAN #cmakedefine01 USE_HYPERSCAN
#cmakedefine01 USE_SIMDJSON #cmakedefine01 USE_SIMDJSON
#cmakedefine01 USE_RAPIDJSON
#cmakedefine01 USE_LFALLOC #cmakedefine01 USE_LFALLOC
#cmakedefine01 USE_LFALLOC_RANDOM_HINT #cmakedefine01 USE_LFALLOC_RANDOM_HINT

View File

@ -430,6 +430,13 @@ inline bool_if_safe_conversion<A, B> greaterOrEqualsOp(A a, B b)
template <typename From, typename To> template <typename From, typename To>
inline bool NO_SANITIZE_UNDEFINED convertNumeric(From value, To & result) inline bool NO_SANITIZE_UNDEFINED convertNumeric(From value, To & result)
{ {
/// If the type is actually the same it's not necessary to do any checks.
if constexpr (std::is_same_v<From, To>)
{
result = value;
return true;
}
/// Note that NaNs doesn't compare equal to anything, but they are still in range of any Float type. /// Note that NaNs doesn't compare equal to anything, but they are still in range of any Float type.
if (isNaN(value) && std::is_floating_point_v<To>) if (isNaN(value) && std::is_floating_point_v<To>)
{ {

View File

@ -323,8 +323,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.") \ M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.") \
\ \
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.") \ M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.") \
M(SettingBool, allow_simdjson, 1, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.") \
\ \
M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \ M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.")
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS) DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -165,6 +165,11 @@ template <> constexpr bool IsDecimalNumber<Decimal32> = true;
template <> constexpr bool IsDecimalNumber<Decimal64> = true; template <> constexpr bool IsDecimalNumber<Decimal64> = true;
template <> constexpr bool IsDecimalNumber<Decimal128> = true; template <> constexpr bool IsDecimalNumber<Decimal128> = true;
template <typename T> struct NativeType { using Type = T; };
template <> struct NativeType<Decimal32> { using Type = Int32; };
template <> struct NativeType<Decimal64> { using Type = Int64; };
template <> struct NativeType<Decimal128> { using Type = Int128; };
} }
/// Specialization of `std::hash` for the Decimal<T> types. /// Specialization of `std::hash` for the Decimal<T> types.

View File

@ -1,7 +1,8 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/IBlockOutputStream.h>
#include <functional>
namespace DB namespace DB

View File

@ -1,8 +1,10 @@
#pragma once #pragma once
#include <vector>
#include <Common/Stopwatch.h>
#include <Core/Types.h> #include <Core/Types.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Common/Stopwatch.h>
#include <vector>
namespace DB namespace DB
{ {
@ -10,7 +12,6 @@ namespace DB
class Block; class Block;
class ReadBuffer; class ReadBuffer;
class WriteBuffer; class WriteBuffer;
class IBlockInputStream;
/// Information for profiling. See IBlockInputStream.h /// Information for profiling. See IBlockInputStream.h
struct BlockStreamProfileInfo struct BlockStreamProfileInfo

View File

@ -9,6 +9,8 @@ ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr
: expression(expression_) : expression(expression_)
{ {
children.push_back(input); children.push_back(input);
cached_header = children.back()->getHeader();
expression->execute(cached_header, true);
} }
String ExpressionBlockInputStream::getName() const { return "Expression"; } String ExpressionBlockInputStream::getName() const { return "Expression"; }
@ -23,9 +25,7 @@ Block ExpressionBlockInputStream::getTotals()
Block ExpressionBlockInputStream::getHeader() const Block ExpressionBlockInputStream::getHeader() const
{ {
Block res = children.back()->getHeader(); return cached_header.cloneEmpty();
expression->execute(res, true);
return res;
} }
Block ExpressionBlockInputStream::readImpl() Block ExpressionBlockInputStream::readImpl()

View File

@ -30,6 +30,7 @@ protected:
private: private:
ExpressionActionsPtr expression; ExpressionActionsPtr expression;
Block cached_header;
}; };
} }

View File

@ -2,6 +2,7 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/BlockStreamProfileInfo.h> #include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <IO/Progress.h> #include <IO/Progress.h>
@ -21,14 +22,10 @@ namespace ErrorCodes
extern const int QUERY_WAS_CANCELLED; extern const int QUERY_WAS_CANCELLED;
} }
class IBlockInputStream;
class ProcessListElement; class ProcessListElement;
class QuotaForIntervals; class QuotaForIntervals;
class QueryStatus; class QueryStatus;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
/** Callback to track the progress of the query. /** Callback to track the progress of the query.
* Used in IBlockInputStream and Context. * Used in IBlockInputStream and Context.
* The function takes the number of rows in the last block, the number of bytes in the last block. * The function takes the number of rows in the last block, the number of bytes in the last block.

View File

@ -1,11 +1,14 @@
#pragma once #pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Storages/TableStructureLockHolder.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
#include <memory>
#include <boost/noncopyable.hpp>
#include <Core/Block.h>
#include <Storages/TableStructureLockHolder.h>
namespace DB namespace DB
@ -64,6 +67,4 @@ private:
std::vector<TableStructureReadLockHolder> table_locks; std::vector<TableStructureReadLockHolder> table_locks;
}; };
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
} }

View File

@ -0,0 +1,16 @@
#pragma once
#include <memory>
#include <vector>
namespace DB
{
class IBlockInputStream;
class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
}

View File

@ -8,7 +8,7 @@
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockStream_fwd.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <DataStreams/IBlockStream_fwd.h>
#include <atomic> #include <atomic>
#include <functional> #include <functional>
@ -7,8 +9,6 @@
namespace DB namespace DB
{ {
class IBlockInputStream;
class IBlockOutputStream;
class Block; class Block;
/** Copies data from the InputStream into the OutputStream /** Copies data from the InputStream into the OutputStream

View File

@ -4,7 +4,7 @@
#include <unordered_set> #include <unordered_set>
#include <Databases/DatabasesCommon.h> #include <Databases/DatabasesCommon.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Storages/IStorage.h> #include <Storages/IStorage_fwd.h>
namespace Poco namespace Poco

View File

@ -1,27 +1,28 @@
#include <iomanip> #include <iomanip>
#include <Poco/Event.h>
#include <Poco/DirectoryIterator.h>
#include <common/logger_useful.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabasesCommon.h>
#include <Common/typeid_cast.h>
#include <Common/escapeForFileName.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Interpreters/Context.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Databases/DatabaseMemory.h>
#include <IO/WriteBufferFromFile.h> #include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabasesCommon.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/IStorage.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Event.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ThreadPool.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h> #include <ext/scope_guard.h>

View File

@ -1,14 +1,16 @@
#include <sstream> #include <Databases/DatabasesCommon.h>
#include <Common/typeid_cast.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Databases/DatabasesCommon.h> #include <Common/typeid_cast.h>
#include <sstream>
namespace DB namespace DB

View File

@ -2,7 +2,7 @@
#include <Core/Types.h> #include <Core/Types.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <Storages/IStorage.h> #include <Storages/IStorage_fwd.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>

View File

@ -6,6 +6,7 @@
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Storages/IndicesDescription.h> #include <Storages/IndicesDescription.h>
#include <Storages/IStorage_fwd.h>
#include <Poco/File.h> #include <Poco/File.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
@ -20,9 +21,6 @@ namespace DB
class Context; class Context;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
struct Settings; struct Settings;

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Common/ProfilingScopedRWLock.h> #include <Common/ProfilingScopedRWLock.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <DataStreams/IBlockInputStream.h>
#include <ext/map.h> #include <ext/map.h>
#include <ext/range.h> #include <ext/range.h>
#include <ext/size.h> #include <ext/size.h>

View File

@ -20,6 +20,7 @@
#include "DictionaryStructure.h" #include "DictionaryStructure.h"
#include "IDictionary.h" #include "IDictionary.h"
#include "IDictionarySource.h" #include "IDictionarySource.h"
#include <DataStreams/IBlockInputStream.h>
namespace ProfileEvents namespace ProfileEvents

View File

@ -7,6 +7,7 @@
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include <Core/Block.h>
#include <common/StringRef.h> #include <common/StringRef.h>
#include <ext/range.h> #include <ext/range.h>
#include "DictionaryStructure.h" #include "DictionaryStructure.h"

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "IDictionarySource.h" #include "IDictionarySource.h"
#include <Core/Block.h>
#include <unordered_map> #include <unordered_map>
#include <ext/singleton.h> #include <ext/singleton.h>

View File

@ -2,12 +2,10 @@
#include "DictionaryStructure.h" #include "DictionaryStructure.h"
#include "IDictionarySource.h" #include "IDictionarySource.h"
#include <Core/Block.h>
namespace Poco namespace Poco { class Logger; }
{
class Logger;
}
namespace DB namespace DB

View File

@ -2,6 +2,7 @@
#include <Poco/Timestamp.h> #include <Poco/Timestamp.h>
#include "IDictionarySource.h" #include "IDictionarySource.h"
#include <Core/Block.h>
namespace DB namespace DB

View File

@ -6,6 +6,7 @@
#include <Columns/ColumnDecimal.h> #include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <Core/Block.h>
#include <ext/range.h> #include <ext/range.h>
#include <ext/size.h> #include <ext/size.h>
#include "DictionaryStructure.h" #include "DictionaryStructure.h"

View File

@ -5,6 +5,7 @@
#include <variant> #include <variant>
#include <Columns/ColumnDecimal.h> #include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Core/Block.h>
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include <ext/range.h> #include <ext/range.h>
#include "DictionaryStructure.h" #include "DictionaryStructure.h"

View File

@ -1,27 +1,27 @@
#pragma once #pragma once
#include <chrono>
#include <memory>
#include <Core/Field.h> #include <Core/Field.h>
#include <Core/Names.h> #include <Core/Names.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/IExternalLoadable.h> #include <Interpreters/IExternalLoadable.h>
#include <Poco/Util/XMLConfiguration.h> #include <Poco/Util/XMLConfiguration.h>
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <common/StringRef.h> #include <common/StringRef.h>
#include "IDictionarySource.h" #include "IDictionarySource.h"
#include <chrono>
#include <memory>
namespace DB namespace DB
{ {
struct IDictionaryBase; struct IDictionaryBase;
using DictionaryPtr = std::unique_ptr<IDictionaryBase>; using DictionaryPtr = std::unique_ptr<IDictionaryBase>;
struct DictionaryStructure; struct DictionaryStructure;
class ColumnString; class ColumnString;
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
struct IDictionaryBase : public IExternalLoadable struct IDictionaryBase : public IExternalLoadable
{ {
using Key = UInt64; using Key = UInt64;

View File

@ -1,7 +1,9 @@
#pragma once #pragma once
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <vector> #include <vector>
#include <DataStreams/IBlockInputStream.h>
namespace DB namespace DB

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Common/config.h> #include <Common/config.h>
#include <Core/Block.h>
#if USE_POCO_MONGODB #if USE_POCO_MONGODB
# include "DictionaryStructure.h" # include "DictionaryStructure.h"

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Common/config.h> #include <Common/config.h>
#include <Core/Block.h>
#if USE_MYSQL #if USE_MYSQL
# include <common/LocalDateTime.h> # include <common/LocalDateTime.h>

View File

@ -1,11 +1,12 @@
#pragma once #pragma once
#include <DataStreams/IBlockStream_fwd.h>
#include <string> #include <string>
namespace DB namespace DB
{ {
class IBlockInputStream;
/// Using in MySQLDictionarySource and XDBCDictionarySource after processing invalidate_query. /// Using in MySQLDictionarySource and XDBCDictionarySource after processing invalidate_query.
std::string readInvalidateQuery(IBlockInputStream & block_input_stream); std::string readInvalidateQuery(IBlockInputStream & block_input_stream);

View File

@ -1,10 +1,12 @@
#pragma once #pragma once
#include <memory>
#include <functional>
#include <unordered_map>
#include <ext/singleton.h>
#include <Core/Types.h> #include <Core/Types.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <ext/singleton.h>
#include <functional>
#include <memory>
#include <unordered_map>
namespace DB namespace DB
@ -17,13 +19,6 @@ struct FormatSettings;
class ReadBuffer; class ReadBuffer;
class WriteBuffer; class WriteBuffer;
class IBlockInputStream;
class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format. /** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
* Note: format and compression are independent things. * Note: format and compression are independent things.
*/ */

View File

@ -73,3 +73,7 @@ endif()
if(USE_SIMDJSON) if(USE_SIMDJSON)
target_link_libraries(clickhouse_functions PRIVATE ${SIMDJSON_LIBRARY}) target_link_libraries(clickhouse_functions PRIVATE ${SIMDJSON_LIBRARY})
endif() endif()
if(USE_RAPIDJSON)
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${RAPIDJSON_INCLUDE_DIR})
endif()

View File

@ -0,0 +1,57 @@
#pragma once
#include <common/StringRef.h>
#include <Common/Exception.h>
#include <Core/Types.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/// This class can be used as an argument for the template class FunctionJSON when we unable to parse JSONs.
/// It can't do anything useful and just throws an exception.
struct DummyJSONParser
{
static constexpr bool need_preallocate = false;
void preallocate(size_t) {}
bool parse(const StringRef &) { throw Exception{"Functions JSON* are not supported without AVX2", ErrorCodes::NOT_IMPLEMENTED}; }
using Iterator = std::nullptr_t;
Iterator getRoot() const { return nullptr; }
static bool isInt64(const Iterator &) { return false; }
static bool isUInt64(const Iterator &) { return false; }
static bool isDouble(const Iterator &) { return false; }
static bool isString(const Iterator &) { return false; }
static bool isArray(const Iterator &) { return false; }
static bool isObject(const Iterator &) { return false; }
static bool isBool(const Iterator &) { return false; }
static bool isNull(const Iterator &) { return true; }
static Int64 getInt64(const Iterator &) { return 0; }
static UInt64 getUInt64(const Iterator &) { return 0; }
static double getDouble(const Iterator &) { return 0; }
static bool getBool(const Iterator &) { return false; }
static StringRef getString(const Iterator &) { return {}; }
static size_t sizeOfArray(const Iterator &) { return 0; }
static bool firstArrayElement(Iterator &) { return false; }
static bool arrayElementByIndex(Iterator &, size_t) { return false; }
static bool nextArrayElement(Iterator &) { return false; }
static size_t sizeOfObject(const Iterator &) { return 0; }
static bool firstObjectMember(Iterator &) { return false; }
static bool firstObjectMember(Iterator &, StringRef &) { return false; }
static bool objectMemberByIndex(Iterator &, size_t) { return false; }
static bool objectMemberByName(Iterator &, const StringRef &) { return false; }
static bool nextObjectMember(Iterator &) { return false; }
static bool nextObjectMember(Iterator &, StringRef &) { return false; }
static bool isObjectMember(const Iterator &) { return false; }
static StringRef getKey(const Iterator &) { return {}; }
};
}

View File

@ -98,11 +98,6 @@ template <typename, typename> struct GreatestBaseImpl;
template <typename, typename> struct ModuloImpl; template <typename, typename> struct ModuloImpl;
template <typename T> struct NativeType { using Type = T; };
template <> struct NativeType<Decimal32> { using Type = Int32; };
template <> struct NativeType<Decimal64> { using Type = Int64; };
template <> struct NativeType<Decimal128> { using Type = Int128; };
/// Binary operations for Decimals need scale args /// Binary operations for Decimals need scale args
/// +|- scale one of args (which scale factor is not 1). ScaleR = oneof(Scale1, Scale2); /// +|- scale one of args (which scale factor is not 1). ScaleR = oneof(Scale1, Scale2);
/// * no agrs scale. ScaleR = Scale1 + Scale2; /// * no agrs scale. ScaleR = Scale1 + Scale2;

View File

@ -1,11 +1,10 @@
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableStructureLockHolder.h> #include <Storages/TableStructureLockHolder.h>
namespace DB namespace DB
{ {
class Context; class Context;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
class Join; class Join;
using JoinPtr = std::shared_ptr<Join>; using JoinPtr = std::shared_ptr<Join>;

View File

@ -1,378 +1,24 @@
#include <Functions/FunctionsJSON.h> #include <Functions/FunctionsJSON.h>
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
#include <Common/config.h>
#if USE_SIMDJSON
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
template <typename T>
class JSONNullableImplBase
{
public:
static DataTypePtr getType() { return std::make_shared<DataTypeNullable>(std::make_shared<T>()); }
static Field getDefault() { return {}; }
};
class JSONHasImpl : public JSONNullableImplBase<DataTypeUInt8>
{
public:
static constexpr auto name{"jsonHas"};
static Field getValue(ParsedJson::iterator &) { return {1}; }
};
class JSONLengthImpl : public JSONNullableImplBase<DataTypeUInt64>
{
public:
static constexpr auto name{"jsonLength"};
static Field getValue(ParsedJson::iterator & pjh)
{
if (!pjh.is_object_or_array())
return getDefault();
size_t size = 0;
if (pjh.down())
{
size += 1;
while (pjh.next())
size += 1;
}
return {size};
}
};
class JSONTypeImpl : public JSONNullableImplBase<DataTypeString>
{
public:
static constexpr auto name{"jsonType"};
static Field getValue(ParsedJson::iterator & pjh)
{
switch (pjh.get_type())
{
case '[':
return "Array";
case '{':
return "Object";
case '"':
return "String";
case 'l':
return "Int64";
case 'd':
return "Float64";
case 't':
return "Bool";
case 'f':
return "Bool";
case 'n':
return "Null";
default:
return "Unknown";
}
}
};
class JSONExtractImpl
{
public:
static constexpr auto name{"jsonExtract"};
static DataTypePtr getType(const DataTypePtr & type)
{
WhichDataType which{type};
if (which.isNativeUInt() || which.isNativeInt() || which.isFloat() || which.isEnum() || which.isDateOrDateTime()
|| which.isStringOrFixedString() || which.isInterval())
return std::make_shared<DataTypeNullable>(type);
if (which.isArray())
{
auto array_type = static_cast<const DataTypeArray *>(type.get());
return std::make_shared<DataTypeArray>(getType(array_type->getNestedType()));
}
if (which.isTuple())
{
auto tuple_type = static_cast<const DataTypeTuple *>(type.get());
DataTypes types;
types.reserve(tuple_type->getElements().size());
for (const DataTypePtr & element : tuple_type->getElements())
{
types.push_back(getType(element));
}
return std::make_shared<DataTypeTuple>(std::move(types));
}
throw Exception{"Unsupported return type schema: " + type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
static Field getDefault(const DataTypePtr & type)
{
WhichDataType which{type};
if (which.isNativeUInt() || which.isNativeInt() || which.isFloat() || which.isEnum() || which.isDateOrDateTime()
|| which.isStringOrFixedString() || which.isInterval())
return {};
if (which.isArray())
return {Array{}};
if (which.isTuple())
{
auto tuple_type = static_cast<const DataTypeTuple *>(type.get());
Tuple tuple;
tuple.toUnderType().reserve(tuple_type->getElements().size());
for (const DataTypePtr & element : tuple_type->getElements())
tuple.toUnderType().push_back(getDefault(element));
return {tuple};
}
// should not reach
throw Exception{"Unsupported return type schema: " + type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
static Field getValue(ParsedJson::iterator & pjh, const DataTypePtr & type)
{
WhichDataType which{type};
if (which.isNativeUInt() || which.isNativeInt() || which.isEnum() || which.isDateOrDateTime() || which.isInterval())
{
if (pjh.is_integer())
return {pjh.get_integer()};
else
return getDefault(type);
}
if (which.isFloat())
{
if (pjh.is_integer())
return {static_cast<double>(pjh.get_integer())};
else if (pjh.is_double())
return {pjh.get_double()};
else
return getDefault(type);
}
if (which.isStringOrFixedString())
{
if (pjh.is_string())
return {String{pjh.get_string()}};
else
return getDefault(type);
}
if (which.isArray())
{
if (!pjh.is_object_or_array())
return getDefault(type);
auto array_type = static_cast<const DataTypeArray *>(type.get());
Array array;
bool first = true;
while (first ? pjh.down() : pjh.next())
{
first = false;
ParsedJson::iterator pjh1{pjh};
array.push_back(getValue(pjh1, array_type->getNestedType()));
}
return {array};
}
if (which.isTuple())
{
if (!pjh.is_object_or_array())
return getDefault(type);
auto tuple_type = static_cast<const DataTypeTuple *>(type.get());
Tuple tuple;
tuple.toUnderType().reserve(tuple_type->getElements().size());
bool valid = true;
bool first = true;
for (const DataTypePtr & element : tuple_type->getElements())
{
if (valid)
{
valid &= first ? pjh.down() : pjh.next();
first = false;
ParsedJson::iterator pjh1{pjh};
tuple.toUnderType().push_back(getValue(pjh1, element));
}
else
tuple.toUnderType().push_back(getDefault(element));
}
return {tuple};
}
// should not reach
throw Exception{"Unsupported return type schema: " + type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
};
class JSONExtractUIntImpl : public JSONNullableImplBase<DataTypeUInt64>
{
public:
static constexpr auto name{"jsonExtractUInt"};
static Field getValue(ParsedJson::iterator & pjh)
{
if (pjh.is_integer())
return {pjh.get_integer()};
else
return getDefault();
}
};
class JSONExtractIntImpl : public JSONNullableImplBase<DataTypeInt64>
{
public:
static constexpr auto name{"jsonExtractInt"};
static Field getValue(ParsedJson::iterator & pjh)
{
if (pjh.is_integer())
return {pjh.get_integer()};
else
return getDefault();
}
};
class JSONExtractFloatImpl : public JSONNullableImplBase<DataTypeFloat64>
{
public:
static constexpr auto name{"jsonExtractFloat"};
static Field getValue(ParsedJson::iterator & pjh)
{
if (pjh.is_double())
return {pjh.get_double()};
else
return getDefault();
}
};
class JSONExtractBoolImpl : public JSONNullableImplBase<DataTypeUInt8>
{
public:
static constexpr auto name{"jsonExtractBool"};
static Field getValue(ParsedJson::iterator & pjh)
{
if (pjh.get_type() == 't')
return {1};
else if (pjh.get_type() == 'f')
return {0};
else
return getDefault();
}
};
// class JSONExtractRawImpl: public JSONNullableImplBase<DataTypeString>
// {
// public:
// static constexpr auto name {"jsonExtractRaw"};
// static Field getValue(ParsedJson::iterator & pjh)
// {
// //
// }
// };
class JSONExtractStringImpl : public JSONNullableImplBase<DataTypeString>
{
public:
static constexpr auto name{"jsonExtractString"};
static Field getValue(ParsedJson::iterator & pjh)
{
if (pjh.is_string())
return {String{pjh.get_string()}};
else
return getDefault();
}
};
}
#else
namespace DB
{
struct JSONHasImpl { static constexpr auto name{"jsonHas"}; };
struct JSONLengthImpl { static constexpr auto name{"jsonLength"}; };
struct JSONTypeImpl { static constexpr auto name{"jsonType"}; };
struct JSONExtractImpl { static constexpr auto name{"jsonExtract"}; };
struct JSONExtractUIntImpl { static constexpr auto name{"jsonExtractUInt"}; };
struct JSONExtractIntImpl { static constexpr auto name{"jsonExtractInt"}; };
struct JSONExtractFloatImpl { static constexpr auto name{"jsonExtractFloat"}; };
struct JSONExtractBoolImpl { static constexpr auto name{"jsonExtractBool"}; };
//struct JSONExtractRawImpl { static constexpr auto name {"jsonExtractRaw"}; };
struct JSONExtractStringImpl { static constexpr auto name{"jsonExtractString"}; };
}
#endif
namespace DB namespace DB
{ {
void registerFunctionsJSON(FunctionFactory & factory) void registerFunctionsJSON(FunctionFactory & factory)
{ {
#if USE_SIMDJSON factory.registerFunction<FunctionJSON<NameJSONHas, JSONHasImpl>>();
if (__builtin_cpu_supports("avx2")) factory.registerFunction<FunctionJSON<NameJSONLength, JSONLengthImpl>>();
{ factory.registerFunction<FunctionJSON<NameJSONKey, JSONKeyImpl>>();
factory.registerFunction<FunctionJSONBase<JSONHasImpl, false>>(); factory.registerFunction<FunctionJSON<NameJSONType, JSONTypeImpl>>();
factory.registerFunction<FunctionJSONBase<JSONLengthImpl, false>>(); factory.registerFunction<FunctionJSON<NameJSONExtractInt, JSONExtractInt64Impl>>();
factory.registerFunction<FunctionJSONBase<JSONTypeImpl, false>>(); factory.registerFunction<FunctionJSON<NameJSONExtractUInt, JSONExtractUInt64Impl>>();
factory.registerFunction<FunctionJSONBase<JSONExtractImpl, true>>(); factory.registerFunction<FunctionJSON<NameJSONExtractFloat, JSONExtractFloat64Impl>>();
factory.registerFunction<FunctionJSONBase<JSONExtractUIntImpl, false>>(); factory.registerFunction<FunctionJSON<NameJSONExtractBool, JSONExtractBoolImpl>>();
factory.registerFunction<FunctionJSONBase<JSONExtractIntImpl, false>>(); factory.registerFunction<FunctionJSON<NameJSONExtractString, JSONExtractStringImpl>>();
factory.registerFunction<FunctionJSONBase<JSONExtractFloatImpl, false>>(); factory.registerFunction<FunctionJSON<NameJSONExtract, JSONExtractImpl>>();
factory.registerFunction<FunctionJSONBase<JSONExtractBoolImpl, false>>(); factory.registerFunction<FunctionJSON<NameJSONExtractKeysAndValues, JSONExtractKeysAndValuesImpl>>();
// factory.registerFunction<FunctionJSONBase< factory.registerFunction<FunctionJSON<NameJSONExtractRaw, JSONExtractRawImpl>>();
// JSONExtractRawImpl,
// false
// >>();
factory.registerFunction<FunctionJSONBase<JSONExtractStringImpl, false>>();
return;
}
#endif
factory.registerFunction<FunctionJSONDummy<JSONHasImpl>>();
factory.registerFunction<FunctionJSONDummy<JSONLengthImpl>>();
factory.registerFunction<FunctionJSONDummy<JSONTypeImpl>>();
factory.registerFunction<FunctionJSONDummy<JSONExtractImpl>>();
factory.registerFunction<FunctionJSONDummy<JSONExtractUIntImpl>>();
factory.registerFunction<FunctionJSONDummy<JSONExtractIntImpl>>();
factory.registerFunction<FunctionJSONDummy<JSONExtractFloatImpl>>();
factory.registerFunction<FunctionJSONDummy<JSONExtractBoolImpl>>();
//factory.registerFunction<FunctionJSONDummy<JSONExtractRawImpl>>();
factory.registerFunction<FunctionJSONDummy<JSONExtractStringImpl>>();
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,211 @@
#pragma once
#include <Common/config.h>
#if USE_RAPIDJSON
#include <common/StringRef.h>
#include <Common/Exception.h>
#include <Core/Types.h>
#include <rapidjson/document.h>
namespace DB
{
/// This class can be used as an argument for the template class FunctionJSON.
/// It provides ability to parse JSONs using rapidjson library.
struct RapidJSONParser
{
static constexpr bool need_preallocate = false;
void preallocate(size_t) {}
bool parse(const StringRef & json)
{
document.Parse(json.data);
return !document.HasParseError();
}
struct Iterator
{
public:
Iterator() {}
Iterator(const rapidjson::Document & document) : value(&document) {}
Iterator(const Iterator & src)
: value(src.value)
, is_object_member(src.is_object_member)
, current_in_array(src.current_in_array)
, end_of_array(src.end_of_array) {}
Iterator & operator =(const Iterator & src)
{
value = src.value;
is_object_member = src.is_object_member;
current_in_array = src.current_in_array;
end_of_array = src.end_of_array;
return *this;
}
bool isInt64() const { return value->IsInt64(); }
bool isUInt64() const { return value->IsUint64(); }
bool isDouble() const { return value->IsDouble(); }
bool isBool() const { return value->IsBool(); }
bool isString() const { return value->IsString(); }
bool isArray() const { return value->IsArray(); }
bool isObject() const { return value->IsObject(); }
bool isNull() const { return value->IsNull(); }
Int64 getInt64() const { return value->GetInt64(); }
UInt64 getUInt64() const { return value->GetUint64(); }
double getDouble() const { return value->GetDouble(); }
bool getBool() const { return value->GetBool(); }
StringRef getString() const { return {value->GetString(), value->GetStringLength()}; }
size_t sizeOfArray() const { return value->Size(); }
bool arrayElementByIndex(size_t index)
{
if (index >= value->Size())
return false;
setRange(value->Begin() + index, value->End());
value = current_in_array++;
return true;
}
bool nextArrayElement()
{
if (current_in_array == end_of_array)
return false;
value = current_in_array++;
return true;
}
size_t sizeOfObject() const { return value->MemberCount(); }
bool objectMemberByIndex(size_t index)
{
if (index >= value->MemberCount())
return false;
setRange(value->MemberBegin() + index, value->MemberEnd());
value = &(current_in_object++)->value;
return true;
}
bool objectMemberByIndex(size_t index, StringRef & key)
{
if (index >= value->MemberCount())
return false;
setRange(value->MemberBegin() + index, value->MemberEnd());
key = getKeyImpl(current_in_object);
value = &(current_in_object++)->value;
return true;
}
bool objectMemberByName(const StringRef & name)
{
auto it = value->FindMember(name.data);
if (it == value->MemberEnd())
return false;
setRange(it, value->MemberEnd());
value = &(current_in_object++)->value;
return true;
}
bool nextObjectMember()
{
if (current_in_object == end_of_object)
return false;
value = &(current_in_object++)->value;
return true;
}
bool nextObjectMember(StringRef & key)
{
if (current_in_object == end_of_object)
return false;
key = getKeyImpl(current_in_object);
value = &(current_in_object++)->value;
return true;
}
bool isObjectMember() const { return is_object_member; }
StringRef getKey() const
{
return getKeyImpl(current_in_object - 1);
}
private:
void setRange(rapidjson::Value::ConstValueIterator current, rapidjson::Value::ConstValueIterator end)
{
current_in_array = &*current;
end_of_array = &*end;
is_object_member = false;
}
void setRange(rapidjson::Value::ConstMemberIterator current, rapidjson::Value::ConstMemberIterator end)
{
current_in_object = &*current;
end_of_object = &*end;
is_object_member = true;
}
static StringRef getKeyImpl(const rapidjson::GenericMember<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>> * member)
{
const auto & name = member->name;
return {name.GetString(), name.GetStringLength()};
}
const rapidjson::Value * value = nullptr;
bool is_object_member = false;
union
{
const rapidjson::GenericMember<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>> * current_in_object;
const rapidjson::Value * current_in_array;
};
union
{
const rapidjson::GenericMember<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>> * end_of_object;
const rapidjson::Value * end_of_array;
};
};
Iterator getRoot() { return Iterator{document}; }
static bool isInt64(const Iterator & it) { return it.isInt64(); }
static bool isUInt64(const Iterator & it) { return it.isUInt64(); }
static bool isDouble(const Iterator & it) { return it.isDouble(); }
static bool isBool(const Iterator & it) { return it.isBool(); }
static bool isString(const Iterator & it) { return it.isString(); }
static bool isArray(const Iterator & it) { return it.isArray(); }
static bool isObject(const Iterator & it) { return it.isObject(); }
static bool isNull(const Iterator & it) { return it.isNull(); }
static Int64 getInt64(const Iterator & it) { return it.getInt64(); }
static UInt64 getUInt64(const Iterator & it) { return it.getUInt64(); }
static double getDouble(const Iterator & it) { return it.getDouble(); }
static bool getBool(const Iterator & it) { return it.getBool(); }
static StringRef getString(const Iterator & it) { return it.getString(); }
static size_t sizeOfArray(const Iterator & it) { return it.sizeOfArray(); }
static bool firstArrayElement(Iterator & it) { return it.arrayElementByIndex(0); }
static bool arrayElementByIndex(Iterator & it, size_t index) { return it.arrayElementByIndex(index); }
static bool nextArrayElement(Iterator & it) { return it.nextArrayElement(); }
static size_t sizeOfObject(const Iterator & it) { return it.sizeOfObject(); }
static bool firstObjectMember(Iterator & it) { return it.objectMemberByIndex(0); }
static bool firstObjectMember(Iterator & it, StringRef & first_key) { return it.objectMemberByIndex(0, first_key); }
static bool objectMemberByIndex(Iterator & it, size_t index) { return it.objectMemberByIndex(index); }
static bool objectMemberByName(Iterator & it, const StringRef & name) { return it.objectMemberByName(name); }
static bool nextObjectMember(Iterator & it) { return it.nextObjectMember(); }
static bool nextObjectMember(Iterator & it, StringRef & next_key) { return it.nextObjectMember(next_key); }
static bool isObjectMember(const Iterator & it) { return it.isObjectMember(); }
static StringRef getKey(const Iterator & it) { return it.getKey(); }
private:
rapidjson::Document document;
};
}
#endif

View File

@ -0,0 +1,150 @@
#pragma once
#include <Common/config.h>
#if USE_SIMDJSON
#include <common/StringRef.h>
#include <Common/Exception.h>
#include <Core/Types.h>
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wold-style-cast"
#pragma clang diagnostic ignored "-Wnewline-eof"
#endif
#include <simdjson/jsonparser.h>
#ifdef __clang__
#pragma clang diagnostic pop
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_ALLOCATE_MEMORY;
}
/// This class can be used as an argument for the template class FunctionJSON.
/// It provides ability to parse JSONs using simdjson library.
struct SimdJSONParser
{
static constexpr bool need_preallocate = true;
void preallocate(size_t max_size)
{
if (!pj.allocateCapacity(max_size))
throw Exception{"Can not allocate memory for " + std::to_string(max_size) + " units when parsing JSON",
ErrorCodes::CANNOT_ALLOCATE_MEMORY};
}
bool parse(const StringRef & json) { return !json_parse(json.data, json.size, pj); }
using Iterator = ParsedJson::iterator;
Iterator getRoot() { return Iterator{pj}; }
static bool isInt64(const Iterator & it) { return it.is_integer(); }
static bool isUInt64(const Iterator &) { return false; /* See https://github.com/lemire/simdjson/issues/68 */ }
static bool isDouble(const Iterator & it) { return it.is_double(); }
static bool isString(const Iterator & it) { return it.is_string(); }
static bool isArray(const Iterator & it) { return it.is_array(); }
static bool isObject(const Iterator & it) { return it.is_object(); }
static bool isBool(const Iterator & it) { return it.get_type() == 't' || it.get_type() == 'f'; }
static bool isNull(const Iterator & it) { return it.is_null(); }
static Int64 getInt64(const Iterator & it) { return it.get_integer(); }
static UInt64 getUInt64(const Iterator &) { return 0; /* isUInt64() never returns true */ }
static double getDouble(const Iterator & it) { return it.get_double(); }
static bool getBool(const Iterator & it) { return it.get_type() == 't'; }
static StringRef getString(const Iterator & it) { return StringRef{it.get_string(), it.get_string_length()}; }
static size_t sizeOfArray(const Iterator & it)
{
size_t size = 0;
Iterator it2 = it;
if (it2.down())
{
do
++size;
while (it2.next());
}
return size;
}
static bool firstArrayElement(Iterator & it) { return it.down(); }
static bool arrayElementByIndex(Iterator & it, size_t index)
{
if (!it.down())
return false;
while (index--)
if (!it.next())
return false;
return true;
}
static bool nextArrayElement(Iterator & it) { return it.next(); }
static size_t sizeOfObject(const Iterator & it)
{
size_t size = 0;
Iterator it2 = it;
if (it2.down())
{
do
++size;
while (it2.next() && it2.next());
}
return size;
}
static bool firstObjectMember(Iterator & it) { return it.down() && it.next(); }
static bool firstObjectMember(Iterator & it, StringRef & first_key)
{
if (!it.down())
return false;
first_key.data = it.get_string();
first_key.size = it.get_string_length();
return it.next();
}
static bool objectMemberByIndex(Iterator & it, size_t index)
{
if (!it.down())
return false;
while (index--)
if (!it.next() || !it.next())
return false;
return it.next();
}
static bool objectMemberByName(Iterator & it, const StringRef & name) { return it.move_to_key(name.data); }
static bool nextObjectMember(Iterator & it) { return it.next() && it.next(); }
static bool nextObjectMember(Iterator & it, StringRef & next_key)
{
if (!it.next())
return false;
next_key.data = it.get_string();
next_key.size = it.get_string_length();
return it.next();
}
static bool isObjectMember(const Iterator & it) { return it.get_scope_type() == '{'; }
static StringRef getKey(const Iterator & it)
{
Iterator it2 = it;
it2.prev();
return StringRef{it2.get_string(), it2.get_string_length()};
}
private:
ParsedJson pj;
};
}
#endif

View File

@ -315,13 +315,13 @@ SOFTWARE.
} }
}; };
struct NameValidUTF8 struct NameIsValidUTF8
{ {
static constexpr auto name = "isValidUTF8"; static constexpr auto name = "isValidUTF8";
}; };
using FunctionValidUTF8 = FunctionStringOrArrayToT<ValidUTF8Impl, NameValidUTF8, UInt8>; using FunctionValidUTF8 = FunctionStringOrArrayToT<ValidUTF8Impl, NameIsValidUTF8, UInt8>;
void registerFunctionValidUTF8(FunctionFactory & factory) void registerFunctionIsValidUTF8(FunctionFactory & factory)
{ {
factory.registerFunction<FunctionValidUTF8>(); factory.registerFunction<FunctionValidUTF8>();
} }

View File

@ -9,7 +9,8 @@ void registerFunctionEmpty(FunctionFactory &);
void registerFunctionNotEmpty(FunctionFactory &); void registerFunctionNotEmpty(FunctionFactory &);
void registerFunctionLength(FunctionFactory &); void registerFunctionLength(FunctionFactory &);
void registerFunctionLengthUTF8(FunctionFactory &); void registerFunctionLengthUTF8(FunctionFactory &);
void registerFunctionValidUTF8(FunctionFactory &); void registerFunctionIsValidUTF8(FunctionFactory &);
void registerFunctionToValidUTF8(FunctionFactory &);
void registerFunctionLower(FunctionFactory &); void registerFunctionLower(FunctionFactory &);
void registerFunctionUpper(FunctionFactory &); void registerFunctionUpper(FunctionFactory &);
void registerFunctionLowerUTF8(FunctionFactory &); void registerFunctionLowerUTF8(FunctionFactory &);
@ -36,7 +37,8 @@ void registerFunctionsString(FunctionFactory & factory)
registerFunctionNotEmpty(factory); registerFunctionNotEmpty(factory);
registerFunctionLength(factory); registerFunctionLength(factory);
registerFunctionLengthUTF8(factory); registerFunctionLengthUTF8(factory);
registerFunctionValidUTF8(factory); registerFunctionIsValidUTF8(factory);
registerFunctionToValidUTF8(factory);
registerFunctionLower(factory); registerFunctionLower(factory);
registerFunctionUpper(factory); registerFunctionUpper(factory);
registerFunctionLowerUTF8(factory); registerFunctionLowerUTF8(factory);

View File

@ -0,0 +1,144 @@
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include <IO/WriteBufferFromVector.h>
#include <IO/WriteBufferValidUTF8.h>
#include <IO/WriteHelpers.h>
#include <Poco/UTF8Encoding.h>
#include <string_view>
#ifdef __SSE2__
# include <emmintrin.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
extern const UInt8 length_of_utf8_sequence[256];
struct ToValidUTF8Impl
{
static void toValidUTF8One(const char * begin, const char * end, WriteBuffer & write_buffer)
{
static constexpr std::string_view replacement = "\xEF\xBF\xBD";
const char * p = begin;
const char * valid_start = begin;
/// The last recorded character was `replacement`.
bool just_put_replacement = false;
auto put_valid = [&write_buffer, &just_put_replacement](const char * data, size_t len)
{
if (len == 0)
return;
just_put_replacement = false;
write_buffer.write(data, len);
};
auto put_replacement = [&write_buffer, &just_put_replacement]()
{
if (just_put_replacement)
return;
just_put_replacement = true;
write_buffer.write(replacement.data(), replacement.size());
};
while (p < end)
{
#ifdef __SSE2__
/// Fast skip of ASCII
static constexpr size_t SIMD_BYTES = 16;
const char * simd_end = p + (end - p) / SIMD_BYTES * SIMD_BYTES;
while (p < simd_end && !_mm_movemask_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(p))))
p += SIMD_BYTES;
if (!(p < end))
break;
#endif
size_t len = length_of_utf8_sequence[static_cast<unsigned char>(*p)];
if (len > 4)
{
/// Invalid start of sequence. Skip one byte.
put_valid(valid_start, p - valid_start);
put_replacement();
++p;
valid_start = p;
}
else if (p + len > end)
{
/// Sequence was not fully written to this buffer.
break;
}
else if (Poco::UTF8Encoding::isLegal(reinterpret_cast<const unsigned char *>(p), len))
{
/// Valid sequence.
p += len;
}
else
{
/// Invalid sequence. Skip just first byte.
put_valid(valid_start, p - valid_start);
put_replacement();
++p;
valid_start = p;
}
}
put_valid(valid_start, p - valid_start);
if (p != end)
put_replacement();
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
const size_t offsets_size = offsets.size();
/// It can be larger than that, but we believe it is unlikely to happen.
res_data.resize(data.size());
res_offsets.resize(offsets_size);
size_t prev_offset = 0;
WriteBufferFromVector<ColumnString::Chars> write_buffer(res_data);
for (size_t i = 0; i < offsets_size; ++i)
{
const char * haystack_data = reinterpret_cast<const char *>(&data[prev_offset]);
const size_t haystack_size = offsets[i] - prev_offset - 1;
toValidUTF8One(haystack_data, haystack_data + haystack_size, write_buffer);
writeChar(0, write_buffer);
res_offsets[i] = write_buffer.count();
prev_offset = offsets[i];
}
write_buffer.finish();
}
static void vector_fixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
{
throw Exception("Column of type FixedString is not supported by toValidUTF8 function", ErrorCodes::ILLEGAL_COLUMN);
}
};
struct NameToValidUTF8
{
static constexpr auto name = "toValidUTF8";
};
using FunctionToValidUTF8 = FunctionStringToString<ToValidUTF8Impl, NameToValidUTF8>;
void registerFunctionToValidUTF8(FunctionFactory & factory)
{
factory.registerFunction<FunctionToValidUTF8>();
}
}

View File

@ -38,18 +38,29 @@ private:
working_buffer = internal_buffer; working_buffer = internal_buffer;
} }
static constexpr size_t initial_size = 32;
public: public:
WriteBufferFromVector(VectorType & vector_) WriteBufferFromVector(VectorType & vector_)
: WriteBuffer(reinterpret_cast<Position>(vector_.data()), vector_.size()), vector(vector_) : WriteBuffer(reinterpret_cast<Position>(vector_.data()), vector_.size()), vector(vector_)
{ {
if (vector.empty()) if (vector.empty())
{ {
static constexpr size_t initial_size = 32;
vector.resize(initial_size); vector.resize(initial_size);
set(reinterpret_cast<Position>(vector.data()), vector.size()); set(reinterpret_cast<Position>(vector.data()), vector.size());
} }
} }
/// Append to vector instead of rewrite.
struct AppendModeTag {};
WriteBufferFromVector(VectorType & vector_, AppendModeTag)
: WriteBuffer(nullptr, 0), vector(vector_)
{
size_t old_size = vector.size();
vector.resize(vector.capacity() < initial_size ? initial_size : vector.capacity());
set(reinterpret_cast<Position>(vector.data() + old_size), (vector.size() - old_size) * sizeof(typename VectorType::value_type));
}
void finish() void finish()
{ {
if (is_finished) if (is_finished)

View File

@ -12,16 +12,14 @@ namespace DB
const size_t WriteBufferValidUTF8::DEFAULT_SIZE = 4096; const size_t WriteBufferValidUTF8::DEFAULT_SIZE = 4096;
namespace /** Index into the table below with the first byte of a UTF-8 sequence to
{
/** Index into the table below with the first byte of a UTF-8 sequence to
* get the number of trailing bytes that are supposed to follow it. * get the number of trailing bytes that are supposed to follow it.
* Note that *legal* UTF-8 values can't have 4 or 5-bytes. The table is * Note that *legal* UTF-8 values can't have 4 or 5-bytes. The table is
* left as-is for anyone who may want to do such conversion, which was * left as-is for anyone who may want to do such conversion, which was
* allowed in earlier algorithms. * allowed in earlier algorithms.
*/ */
const UInt8 length_of_utf8_sequence[256] = extern const UInt8 length_of_utf8_sequence[256] =
{ {
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
@ -30,8 +28,7 @@ namespace
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4,5,5,5,5,6,6,6,6 3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4,5,5,5,5,6,6,6,6
}; };
}
WriteBufferValidUTF8::WriteBufferValidUTF8( WriteBufferValidUTF8::WriteBufferValidUTF8(

View File

@ -1,14 +1,16 @@
#pragma once #pragma once
#include <Core/Types.h> #include <Core/Types.h>
#include <Storages/IStorage_fwd.h>
#include <Common/ActionLock.h> #include <Common/ActionLock.h>
#include <unordered_map>
#include <mutex> #include <mutex>
#include <unordered_map>
namespace DB namespace DB
{ {
class IStorage;
class Context; class Context;
/// Holds ActionLocks for tables /// Holds ActionLocks for tables

View File

@ -18,7 +18,7 @@
#include <Common/LRUCache.h> #include <Common/LRUCache.h>
#include <Common/ColumnsHashing.h> #include <Common/ColumnsHashing.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <Interpreters/AggregateDescription.h> #include <Interpreters/AggregateDescription.h>

View File

@ -1,9 +1,8 @@
#pragma once #pragma once
#include <Client/ConnectionPool.h>
#include <Interpreters/Cluster.h> #include <Interpreters/Cluster.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <Storages/IStorage.h>
#include <Client/ConnectionPool.h>
namespace DB namespace DB
{ {

View File

@ -1,8 +1,8 @@
#pragma once #pragma once
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <Storages/IStorage.h> #include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Storages/IStorage_fwd.h>
namespace DB namespace DB
{ {

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <Storages/IStorage.h>
#include <Interpreters/Cluster.h> #include <Interpreters/Cluster.h>
namespace DB namespace DB

View File

@ -2,14 +2,16 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Core/Types.h>
#include <Interpreters/ClientInfo.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Core/Types.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/ClientInfo.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Common/LRUCache.h> #include <Common/LRUCache.h>
#include <Common/MultiVersion.h> #include <Common/MultiVersion.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/config.h> #include <Common/config.h>
#include <Storages/IStorage_fwd.h>
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
@ -65,14 +67,7 @@ struct MergeTreeSettings;
class IDatabase; class IDatabase;
class DDLGuard; class DDLGuard;
class DDLWorker; class DDLWorker;
class IStorage;
class ITableFunction; class ITableFunction;
using StoragePtr = std::shared_ptr<IStorage>;
using Tables = std::map<String, StoragePtr>;
class IBlockInputStream;
class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
class Block; class Block;
class ActionLocksManager; class ActionLocksManager;
using ActionLocksManagerPtr = std::shared_ptr<ActionLocksManager>; using ActionLocksManagerPtr = std::shared_ptr<ActionLocksManager>;

View File

@ -10,6 +10,7 @@
#include <Interpreters/InterpreterSelectWithUnionQuery.h> #include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h> #include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypeAggregateFunction.h> #include <DataTypes/DataTypeAggregateFunction.h>
namespace DB namespace DB

View File

@ -1,15 +1,16 @@
#pragma once #pragma once
#include <Interpreters/Context.h>
#include <Common/config.h>
#include <Common/SipHash.h>
#include <Core/Settings.h>
#include <Core/Names.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Names.h>
#include <Core/Settings.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/Context.h>
#include <Common/SipHash.h>
#include <Common/config.h>
#include <unordered_set>
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
namespace DB namespace DB
@ -37,9 +38,6 @@ using FunctionBuilderPtr = std::shared_ptr<IFunctionBuilder>;
class IDataType; class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>; using DataTypePtr = std::shared_ptr<const IDataType>;
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class ExpressionActions; class ExpressionActions;
/** Action on the block. /** Action on the block.

View File

@ -39,6 +39,7 @@
#include <Storages/StorageJoin.h> #include <Storages/StorageJoin.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <DataStreams/IBlockInputStream.h>
#include <Dictionaries/IDictionary.h> #include <Dictionaries/IDictionary.h>

View File

@ -1,10 +1,12 @@
#pragma once #pragma once
#include <Core/Settings.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/ActionsVisitor.h> #include <Interpreters/ActionsVisitor.h>
#include <Interpreters/AggregateDescription.h> #include <Interpreters/AggregateDescription.h>
#include <Core/Settings.h>
#include <Interpreters/SyntaxAnalyzer.h> #include <Interpreters/SyntaxAnalyzer.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
namespace DB namespace DB
@ -19,13 +21,6 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
struct ASTTableJoin; struct ASTTableJoin;
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using Tables = std::map<String, StoragePtr>;
class ASTFunction; class ASTFunction;
class ASTExpressionList; class ASTExpressionList;
class ASTSelectQuery; class ASTSelectQuery;

View File

@ -1,16 +1,17 @@
#pragma once #pragma once
#include <Core/SettingsCommon.h>
#include <Core/Types.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <string> #include <string>
#include <memory> #include <memory>
#include <Core/Types.h>
#include <Core/SettingsCommon.h>
#include <Parsers/IAST_fwd.h>
namespace DB namespace DB
{ {
class IStorage;
class ASTSelectQuery; class ASTSelectQuery;
class Context; class Context;

View File

@ -1,7 +1,8 @@
#pragma once #pragma once
#include <Core/Block.h>
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Parsers/IAST.h> #include <Parsers/IAST_fwd.h>
namespace DB namespace DB
{ {

View File

@ -2,6 +2,7 @@
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/IndicesDescription.h> #include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h> #include <Storages/ConstraintsDescription.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
@ -13,8 +14,6 @@ namespace DB
class Context; class Context;
class ASTCreateQuery; class ASTCreateQuery;
class ASTExpressionList; class ASTExpressionList;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
/** Allows to create new table or database, /** Allows to create new table or database,

View File

@ -1,7 +1,8 @@
#pragma once #pragma once
#include <Core/Block.h>
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Parsers/IAST.h> #include <Parsers/IAST_fwd.h>
namespace DB namespace DB

View File

@ -4,13 +4,14 @@
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h> #include <Interpreters/SelectQueryOptions.h>
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/TableStructureLockHolder.h>
namespace Poco { class Logger; } namespace Poco { class Logger; }

View File

@ -2,16 +2,16 @@
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
namespace Poco { class Logger; }
namespace DB namespace DB
{ {
class Context; class Context;
class ASTSystemQuery; class ASTSystemQuery;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
class InterpreterSystemQuery : public IInterpreter class InterpreterSystemQuery : public IInterpreter
{ {

View File

@ -18,7 +18,7 @@
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockStream_fwd.h>
#include <variant> #include <variant>
#include <common/constexpr_helpers.h> #include <common/constexpr_helpers.h>

View File

@ -1,10 +1,10 @@
#pragma once #pragma once
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <Storages/IStorage.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/MutationCommands.h> #include <Storages/MutationCommands.h>

View File

@ -1,25 +1,28 @@
#pragma once #pragma once
#include <map>
#include <Core/Defines.h>
#include <DataStreams/BlockIO.h>
#include <IO/Progress.h>
#include <Interpreters/CancellationCode.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/QueryPriorities.h>
#include <Storages/IStorage_fwd.h>
#include <Poco/Condition.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/Throttler.h>
#include <condition_variable>
#include <list> #include <list>
#include <map>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <shared_mutex> #include <shared_mutex>
#include <Poco/Condition.h> #include <unordered_map>
#include <Core/Defines.h>
#include <IO/Progress.h>
#include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Common/Throttler.h>
#include <Common/CurrentThread.h>
#include <Interpreters/QueryPriorities.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/CancellationCode.h>
#include <DataStreams/BlockIO.h>
namespace CurrentMetrics namespace CurrentMetrics
@ -30,9 +33,6 @@ namespace CurrentMetrics
namespace DB namespace DB
{ {
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using Tables = std::map<String, StoragePtr>;
class Context; class Context;
struct Settings; struct Settings;
class IAST; class IAST;

View File

@ -1,15 +1,13 @@
#pragma once #pragma once
#include <Interpreters/AnalyzedJoin.h>
#include <Interpreters/Aliases.h> #include <Interpreters/Aliases.h>
#include <Interpreters/AnalyzedJoin.h>
#include <Interpreters/SelectQueryOptions.h> #include <Interpreters/SelectQueryOptions.h>
#include <Storages/IStorage_fwd.h>
namespace DB namespace DB
{ {
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
NameSet removeDuplicateColumns(NamesAndTypesList & columns); NameSet removeDuplicateColumns(NamesAndTypesList & columns);
struct SyntaxAnalyzerResult struct SyntaxAnalyzerResult

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTWithAlias.h> #include <Parsers/ASTWithAlias.h>
#include <Parsers/ASTSubquery.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
@ -141,7 +142,7 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
} }
} }
/** need_parens - do I need parentheses around the expression with the operator. /** need_parens - do we need parentheses around the expression with the operator.
* They are needed only if this expression is included in another expression with the operator. * They are needed only if this expression is included in another expression with the operator.
*/ */
@ -182,7 +183,22 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
&& (name == "like" || name == "notLike") && (name == "like" || name == "notLike")
&& highlightStringLiteralWithMetacharacters(arguments->children[1], settings, "%_"); && highlightStringLiteralWithMetacharacters(arguments->children[1], settings, "%_");
if (!special_hilite) /// Format x IN 1 as x IN (1): put parens around rhs even if there is a single element in set.
const auto * second_arg_func = arguments->children[1]->as<ASTFunction>();
const auto * second_arg_literal = arguments->children[1]->as<ASTLiteral>();
bool extra_parents_around_in_rhs = (name == "in" || name == "notIn" || name == "globalIn" || name == "globalNotIn")
&& !(second_arg_func && second_arg_func->name == "tuple")
&& !(second_arg_literal && second_arg_literal->value.getType() == Field::Types::Tuple)
&& !arguments->children[1]->as<ASTSubquery>();
if (extra_parents_around_in_rhs)
{
settings.ostr << '(';
arguments->children[1]->formatImpl(settings, state, nested_dont_need_parens);
settings.ostr << ')';
}
if (!special_hilite && !extra_parents_around_in_rhs)
arguments->children[1]->formatImpl(settings, state, nested_need_parens); arguments->children[1]->formatImpl(settings, state, nested_need_parens);
if (frame.need_parens) if (frame.need_parens)

View File

@ -10,7 +10,7 @@ class ParserQuery : public IParserBase
{ {
private: private:
const char * end; const char * end;
bool enable_explain; bool enable_explain; /// Allow queries prefixed with AST and ANALYZE for development purposes.
const char * getName() const override { return "Query"; } const char * getName() const override { return "Query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;

View File

@ -3,6 +3,7 @@
#include <optional> #include <optional>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/IndicesDescription.h> #include <Storages/IndicesDescription.h>
@ -89,7 +90,6 @@ struct AlterCommand
bool isMutable() const; bool isMutable() const;
}; };
class IStorage;
class Context; class Context;
class AlterCommands : public std::vector<AlterCommand> class AlterCommands : public std::vector<AlterCommand>

View File

@ -1,11 +1,331 @@
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/AlterCommands.h> #include <Storages/AlterCommands.h>
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>
namespace DB namespace DB
{ {
void IStorage::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureWriteLockHolder & table_lock_holder) namespace ErrorCodes
{
extern const int COLUMN_QUERIED_MORE_THAN_ONCE;
extern const int DUPLICATE_COLUMN;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int EMPTY_LIST_OF_COLUMNS_QUERIED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int TYPE_MISMATCH;
}
IStorage::IStorage(ColumnsDescription columns_)
{
setColumns(std::move(columns_));
}
const ColumnsDescription & IStorage::getColumns() const
{
return columns;
}
void IStorage::setColumns(ColumnsDescription columns_)
{
if (columns_.getOrdinary().empty())
throw Exception("Empty list of columns passed", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
columns = std::move(columns_);
}
const IndicesDescription & IStorage::getIndices() const
{
return indices;
}
void IStorage::setIndices(IndicesDescription indices_)
{
indices = std::move(indices_);
}
NameAndTypePair IStorage::getColumn(const String & column_name) const
{
/// By default, we assume that there are no virtual columns in the storage.
return getColumns().getPhysical(column_name);
}
bool IStorage::hasColumn(const String & column_name) const
{
/// By default, we assume that there are no virtual columns in the storage.
return getColumns().hasPhysical(column_name);
}
Block IStorage::getSampleBlock() const
{
Block res;
for (const auto & column : getColumns().getAllPhysical())
res.insert({column.type->createColumn(), column.type, column.name});
return res;
}
Block IStorage::getSampleBlockNonMaterialized() const
{
Block res;
for (const auto & column : getColumns().getOrdinary())
res.insert({column.type->createColumn(), column.type, column.name});
return res;
}
Block IStorage::getSampleBlockForColumns(const Names & column_names) const
{
Block res;
NamesAndTypesList all_columns = getColumns().getAll();
std::unordered_map<String, DataTypePtr> columns_map;
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
for (const auto & name : column_names)
{
auto it = columns_map.find(name);
if (it != columns_map.end())
{
res.insert({it->second->createColumn(), it->second, it->first});
}
else
{
/// Virtual columns.
NameAndTypePair elem = getColumn(name);
res.insert({elem.type->createColumn(), elem.type, elem.name});
}
}
return res;
}
namespace
{
using NamesAndTypesMap = GOOGLE_NAMESPACE::dense_hash_map<StringRef, const IDataType *, StringRefHash>;
using UniqueStrings = GOOGLE_NAMESPACE::dense_hash_set<StringRef, StringRefHash>;
String listOfColumns(const NamesAndTypesList & available_columns)
{
std::stringstream ss;
for (auto it = available_columns.begin(); it != available_columns.end(); ++it)
{
if (it != available_columns.begin())
ss << ", ";
ss << it->name;
}
return ss.str();
}
NamesAndTypesMap getColumnsMap(const NamesAndTypesList & columns)
{
NamesAndTypesMap res;
res.set_empty_key(StringRef());
for (const auto & column : columns)
res.insert({column.name, column.type.get()});
return res;
}
UniqueStrings initUniqueStrings()
{
UniqueStrings strings;
strings.set_empty_key(StringRef());
return strings;
}
}
void IStorage::check(const Names & column_names) const
{
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
const String list_of_columns = listOfColumns(available_columns);
if (column_names.empty())
throw Exception("Empty list of columns queried. There are columns: " + list_of_columns, ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
const auto columns_map = getColumnsMap(available_columns);
auto unique_names = initUniqueStrings();
for (const auto & name : column_names)
{
if (columns_map.end() == columns_map.find(name))
throw Exception(
"There is no column with name " + name + " in table. There are columns: " + list_of_columns,
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (unique_names.end() != unique_names.find(name))
throw Exception("Column " + name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(name);
}
}
void IStorage::check(const NamesAndTypesList & provided_columns) const
{
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
const auto columns_map = getColumnsMap(available_columns);
auto unique_names = initUniqueStrings();
for (const NameAndTypePair & column : provided_columns)
{
auto it = columns_map.find(column.name);
if (columns_map.end() == it)
throw Exception(
"There is no column with name " + column.name + ". There are columns: " + listOfColumns(available_columns),
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (!column.type->equals(*it->second))
throw Exception(
"Type mismatch for column " + column.name + ". Column has type " + it->second->getName() + ", got type "
+ column.type->getName(),
ErrorCodes::TYPE_MISMATCH);
if (unique_names.end() != unique_names.find(column.name))
throw Exception("Column " + column.name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(column.name);
}
}
void IStorage::check(const NamesAndTypesList & provided_columns, const Names & column_names) const
{
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
const auto available_columns_map = getColumnsMap(available_columns);
const auto & provided_columns_map = getColumnsMap(provided_columns);
if (column_names.empty())
throw Exception(
"Empty list of columns queried. There are columns: " + listOfColumns(available_columns),
ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
auto unique_names = initUniqueStrings();
for (const String & name : column_names)
{
auto it = provided_columns_map.find(name);
if (provided_columns_map.end() == it)
continue;
auto jt = available_columns_map.find(name);
if (available_columns_map.end() == jt)
throw Exception(
"There is no column with name " + name + ". There are columns: " + listOfColumns(available_columns),
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (!it->second->equals(*jt->second))
throw Exception(
"Type mismatch for column " + name + ". Column has type " + jt->second->getName() + ", got type " + it->second->getName(),
ErrorCodes::TYPE_MISMATCH);
if (unique_names.end() != unique_names.find(name))
throw Exception("Column " + name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(name);
}
}
void IStorage::check(const Block & block, bool need_all) const
{
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
const auto columns_map = getColumnsMap(available_columns);
NameSet names_in_block;
block.checkNumberOfRows();
for (const auto & column : block)
{
if (names_in_block.count(column.name))
throw Exception("Duplicate column " + column.name + " in block", ErrorCodes::DUPLICATE_COLUMN);
names_in_block.insert(column.name);
auto it = columns_map.find(column.name);
if (columns_map.end() == it)
throw Exception(
"There is no column with name " + column.name + ". There are columns: " + listOfColumns(available_columns),
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (!column.type->equals(*it->second))
throw Exception(
"Type mismatch for column " + column.name + ". Column has type " + it->second->getName() + ", got type "
+ column.type->getName(),
ErrorCodes::TYPE_MISMATCH);
}
if (need_all && names_in_block.size() < columns_map.size())
{
for (auto it = available_columns.begin(); it != available_columns.end(); ++it)
{
if (!names_in_block.count(it->name))
throw Exception("Expected column " + it->name, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
}
}
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
{
TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}
TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_id)
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}
void IStorage::lockNewDataStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getTableName() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR);
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
}
void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getTableName() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
}
TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
return result;
}
void IStorage::alter(
const AlterCommands & params,
const String & database_name,
const String & table_name,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{ {
for (const auto & param : params) for (const auto & param : params)
{ {

View File

@ -1,18 +1,19 @@
#pragma once #pragma once
#include <Common/Exception.h>
#include <Common/RWLock.h>
#include <Core/Names.h> #include <Core/Names.h>
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Storages/ITableDeclaration.h>
#include <Storages/TableStructureLockHolder.h>
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/CancellationCode.h> #include <Interpreters/CancellationCode.h>
#include <shared_mutex> #include <Storages/IStorage_fwd.h>
#include <memory> #include <Storages/SelectQueryInfo.h>
#include <optional> #include <Storages/TableStructureLockHolder.h>
#include <Common/ActionLock.h> #include <Common/ActionLock.h>
#include <Common/Exception.h>
#include <Common/RWLock.h>
#include <optional>
#include <shared_mutex>
namespace DB namespace DB
@ -25,22 +26,11 @@ namespace ErrorCodes
} }
class Context; class Context;
class IBlockInputStream;
class IBlockOutputStream;
using StorageActionBlockType = size_t; using StorageActionBlockType = size_t;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
class ASTCreateQuery; class ASTCreateQuery;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using StorageWeakPtr = std::weak_ptr<IStorage>;
struct Settings; struct Settings;
class AlterCommands; class AlterCommands;
@ -48,108 +38,105 @@ class MutationCommands;
class PartitionCommands; class PartitionCommands;
/** Storage. Responsible for /** Storage. Describes the table. Responsible for
* - storage of the table data; * - storage of the table data;
* - the definition in which files (or not in files) the data is stored; * - the definition in which files (or not in files) the data is stored;
* - data lookups and appends; * - data lookups and appends;
* - data storage structure (compression, etc.) * - data storage structure (compression, etc.)
* - concurrent access to data (locks, etc.) * - concurrent access to data (locks, etc.)
*/ */
class IStorage : public std::enable_shared_from_this<IStorage>, private boost::noncopyable, public ITableDeclaration class IStorage : public std::enable_shared_from_this<IStorage>
{ {
public: public:
IStorage() = default;
explicit IStorage(ColumnsDescription columns_);
virtual ~IStorage() = default;
IStorage(const IStorage &) = delete;
IStorage & operator=(const IStorage &) = delete;
/// The main name of the table type (for example, StorageMergeTree). /// The main name of the table type (for example, StorageMergeTree).
virtual std::string getName() const = 0; virtual std::string getName() const = 0;
/// The name of the table. /// The name of the table.
virtual std::string getTableName() const = 0; virtual std::string getTableName() const = 0;
virtual std::string getDatabaseName() const { return {}; } // FIXME: should be abstract method. virtual std::string getDatabaseName() const { return {}; } // FIXME: should be an abstract method!
/** Returns true if the storage receives data from a remote server or servers. */ /// Returns true if the storage receives data from a remote server or servers.
virtual bool isRemote() const { return false; } virtual bool isRemote() const { return false; }
/** Returns true if the storage supports queries with the SAMPLE section. */ /// Returns true if the storage supports queries with the SAMPLE section.
virtual bool supportsSampling() const { return false; } virtual bool supportsSampling() const { return false; }
/** Returns true if the storage supports queries with the FINAL section. */ /// Returns true if the storage supports queries with the FINAL section.
virtual bool supportsFinal() const { return false; } virtual bool supportsFinal() const { return false; }
/** Returns true if the storage supports queries with the PREWHERE section. */ /// Returns true if the storage supports queries with the PREWHERE section.
virtual bool supportsPrewhere() const { return false; } virtual bool supportsPrewhere() const { return false; }
/** Returns true if the storage replicates SELECT, INSERT and ALTER commands among replicas. */ /// Returns true if the storage replicates SELECT, INSERT and ALTER commands among replicas.
virtual bool supportsReplication() const { return false; } virtual bool supportsReplication() const { return false; }
/** Returns true if the storage supports deduplication of inserted data blocks . */ /// Returns true if the storage supports deduplication of inserted data blocks.
virtual bool supportsDeduplication() const { return false; } virtual bool supportsDeduplication() const { return false; }
public: /// thread-unsafe part. lockStructure must be acquired
const ColumnsDescription & getColumns() const;
void setColumns(ColumnsDescription columns_);
const IndicesDescription & getIndices() const;
void setIndices(IndicesDescription indices_);
/// NOTE: these methods should include virtual columns,
/// but should NOT include ALIAS columns (they are treated separately).
virtual NameAndTypePair getColumn(const String & column_name) const;
virtual bool hasColumn(const String & column_name) const;
Block getSampleBlock() const;
Block getSampleBlockNonMaterialized() const;
Block getSampleBlockForColumns(const Names & column_names) const; /// including virtual and alias columns.
/// Verify that all the requested names are in the table and are set correctly:
/// list of names is not empty and the names do not repeat.
void check(const Names & column_names) const;
/// Check that all the requested names are in the table and have the correct types.
void check(const NamesAndTypesList & columns) const;
/// Check that all names from the intersection of `names` and `columns` are in the table and have the same types.
void check(const NamesAndTypesList & columns, const Names & column_names) const;
/// Check that the data block contains all the columns of the table with the correct types,
/// contains only the columns of the table, and all the columns are different.
/// If |need_all| is set, then checks that all the columns of the table are in the block.
void check(const Block & block, bool need_all = false) const;
private:
ColumnsDescription columns;
IndicesDescription indices;
public:
/// Acquire this lock if you need the table structure to remain constant during the execution of /// Acquire this lock if you need the table structure to remain constant during the execution of
/// the query. If will_add_new_data is true, this means that the query will add new data to the table /// the query. If will_add_new_data is true, this means that the query will add new data to the table
/// (INSERT or a parts merge). /// (INSERT or a parts merge).
TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id) TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id);
{
TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}
/// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you /// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you
/// can modify the table structure. It can later be upgraded to the exclusive lock. /// can modify the table structure. It can later be upgraded to the exclusive lock.
TableStructureWriteLockHolder lockAlterIntention(const String & query_id) TableStructureWriteLockHolder lockAlterIntention(const String & query_id);
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}
/// Upgrade alter intention lock and make sure that no new data is inserted into the table. /// Upgrade alter intention lock and make sure that no new data is inserted into the table.
/// This is used by the ALTER MODIFY of the MergeTree storage to consistently determine /// This is used by the ALTER MODIFY of the MergeTree storage to consistently determine
/// the set of parts that needs to be altered. /// the set of parts that needs to be altered.
void lockNewDataStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id) void lockNewDataStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id);
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getTableName() + " was not taken. This is a bug.",
ErrorCodes::LOGICAL_ERROR);
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
}
/// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries /// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries
/// to ensure that no other query uses the table structure and it can be safely changed. /// to ensure that no other query uses the table structure and it can be safely changed.
void lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id) void lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id);
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getTableName() + " was not taken. This is a bug.",
ErrorCodes::LOGICAL_ERROR);
if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
}
/// Acquire the full exclusive lock immediately. No other queries can run concurrently. /// Acquire the full exclusive lock immediately. No other queries can run concurrently.
TableStructureWriteLockHolder lockExclusively(const String & query_id) TableStructureWriteLockHolder lockExclusively(const String & query_id);
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
return result;
}
/** Returns stage to which query is going to be processed in read() function. /** Returns stage to which query is going to be processed in read() function.
* (Normally, the function only reads the columns from the list, but in other cases, * (Normally, the function only reads the columns from the list, but in other cases,
@ -335,8 +322,11 @@ public:
/// Returns additional columns that need to be read for FINAL to work. /// Returns additional columns that need to be read for FINAL to work.
virtual Names getColumnsRequiredForFinal() const { return {}; } virtual Names getColumnsRequiredForFinal() const { return {}; }
using ITableDeclaration::ITableDeclaration; protected:
using std::enable_shared_from_this<IStorage>::shared_from_this; /// Returns whether the column is virtual - by default all columns are real.
/// Initially reserved virtual column name may be shadowed by real column.
/// Returns false even for non-existent non-virtual columns.
virtual bool isVirtualColumn(const String & /* column_name */) const { return false; }
private: private:
/// You always need to take the next three locks in this order. /// You always need to take the next three locks in this order.
@ -357,7 +347,4 @@ private:
mutable RWLock structure_lock = RWLockImpl::create(); mutable RWLock structure_lock = RWLockImpl::create();
}; };
/// table name -> table
using Tables = std::map<String, StoragePtr>;
} }

View File

@ -0,0 +1,17 @@
#pragma once
#include <Core/Types.h>
#include <map>
#include <memory>
namespace DB
{
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using StorageWeakPtr = std::weak_ptr<IStorage>;
using Tables = std::map<String, StoragePtr>;
}

View File

@ -1,290 +0,0 @@
#include <Storages/ITableDeclaration.h>
#include <Common/Exception.h>
#include <boost/range/join.hpp>
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>
#include <unordered_set>
#include <sstream>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int EMPTY_LIST_OF_COLUMNS_QUERIED;
extern const int COLUMN_QUERIED_MORE_THAN_ONCE;
extern const int TYPE_MISMATCH;
extern const int DUPLICATE_COLUMN;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
}
const ColumnsDescription & ITableDeclaration::getColumns() const
{
return columns;
}
const IndicesDescription & ITableDeclaration::getIndices() const
{
return indices;
}
void ITableDeclaration::setColumns(ColumnsDescription columns_)
{
if (columns_.getOrdinary().empty())
throw Exception("Empty list of columns passed", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
columns = std::move(columns_);
}
void ITableDeclaration::setIndices(IndicesDescription indices_)
{
indices = std::move(indices_);
}
void ITableDeclaration::setConstraints(ConstraintsDescription constraints_)
{
constraints = std::move(constraints_);
}
bool ITableDeclaration::hasColumn(const String & column_name) const
{
return getColumns().hasPhysical(column_name); /// By default, we assume that there are no virtual columns in the storage.
}
NameAndTypePair ITableDeclaration::getColumn(const String & column_name) const
{
return getColumns().getPhysical(column_name); /// By default, we assume that there are no virtual columns in the storage.
}
Block ITableDeclaration::getSampleBlock() const
{
Block res;
for (const auto & col : getColumns().getAllPhysical())
res.insert({ col.type->createColumn(), col.type, col.name });
return res;
}
Block ITableDeclaration::getSampleBlockNonMaterialized() const
{
Block res;
for (const auto & col : getColumns().getOrdinary())
res.insert({ col.type->createColumn(), col.type, col.name });
return res;
}
Block ITableDeclaration::getSampleBlockForColumns(const Names & column_names) const
{
Block res;
NamesAndTypesList all_columns = getColumns().getAll();
std::unordered_map<String, DataTypePtr> columns_map;
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
for (const auto & name : column_names)
{
auto it = columns_map.find(name);
if (it != columns_map.end())
{
res.insert({ it->second->createColumn(), it->second, it->first });
}
else
{
/// Virtual columns.
NameAndTypePair elem = getColumn(name);
res.insert({ elem.type->createColumn(), elem.type, elem.name });
}
}
return res;
}
static std::string listOfColumns(const NamesAndTypesList & available_columns)
{
std::stringstream s;
for (auto it = available_columns.begin(); it != available_columns.end(); ++it)
{
if (it != available_columns.begin())
s << ", ";
s << it->name;
}
return s.str();
}
using NamesAndTypesMap = GOOGLE_NAMESPACE::dense_hash_map<StringRef, const IDataType *, StringRefHash>;
static NamesAndTypesMap & getColumnsMapImpl(NamesAndTypesMap & res) { return res; }
template <typename Arg, typename... Args>
static NamesAndTypesMap & getColumnsMapImpl(NamesAndTypesMap & res, const Arg & arg, const Args &... args)
{
static_assert(std::is_same_v<Arg, NamesAndTypesList>, "getColumnsMap requires arguments of type NamesAndTypesList");
for (const auto & column : arg)
res.insert({column.name, column.type.get()});
return getColumnsMapImpl(res, args...);
}
template <typename... Args>
static NamesAndTypesMap getColumnsMap(const Args &... args)
{
NamesAndTypesMap res;
res.set_empty_key(StringRef());
return getColumnsMapImpl(res, args...);
}
void ITableDeclaration::check(const Names & column_names) const
{
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
if (column_names.empty())
throw Exception("Empty list of columns queried. There are columns: " + listOfColumns(available_columns),
ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
const auto columns_map = getColumnsMap(available_columns);
using UniqueStrings = GOOGLE_NAMESPACE::dense_hash_set<StringRef, StringRefHash>;
UniqueStrings unique_names;
unique_names.set_empty_key(StringRef());
for (const auto & name : column_names)
{
if (columns_map.end() == columns_map.find(name))
throw Exception("There is no column with name " + name + " in table. There are columns: " + listOfColumns(available_columns),
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (unique_names.end() != unique_names.find(name))
throw Exception("Column " + name + " queried more than once",
ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(name);
}
}
void ITableDeclaration::check(const NamesAndTypesList & provided_columns) const
{
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
const auto columns_map = getColumnsMap(available_columns);
using UniqueStrings = GOOGLE_NAMESPACE::dense_hash_set<StringRef, StringRefHash>;
UniqueStrings unique_names;
unique_names.set_empty_key(StringRef());
for (const NameAndTypePair & column : provided_columns)
{
NamesAndTypesMap::const_iterator it = columns_map.find(column.name);
if (columns_map.end() == it)
throw Exception("There is no column with name " + column.name + ". There are columns: "
+ listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (!column.type->equals(*it->second))
throw Exception("Type mismatch for column " + column.name + ". Column has type "
+ it->second->getName() + ", got type " + column.type->getName(), ErrorCodes::TYPE_MISMATCH);
if (unique_names.end() != unique_names.find(column.name))
throw Exception("Column " + column.name + " queried more than once",
ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(column.name);
}
}
void ITableDeclaration::check(const NamesAndTypesList & provided_columns, const Names & column_names) const
{
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
const auto available_columns_map = getColumnsMap(available_columns);
const NamesAndTypesMap & provided_columns_map = getColumnsMap(provided_columns);
if (column_names.empty())
throw Exception("Empty list of columns queried. There are columns: " + listOfColumns(available_columns),
ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
using UniqueStrings = GOOGLE_NAMESPACE::dense_hash_set<StringRef, StringRefHash>;
UniqueStrings unique_names;
unique_names.set_empty_key(StringRef());
for (const String & name : column_names)
{
NamesAndTypesMap::const_iterator it = provided_columns_map.find(name);
if (provided_columns_map.end() == it)
continue;
NamesAndTypesMap::const_iterator jt = available_columns_map.find(name);
if (available_columns_map.end() == jt)
throw Exception("There is no column with name " + name + ". There are columns: "
+ listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (!it->second->equals(*jt->second))
throw Exception("Type mismatch for column " + name + ". Column has type "
+ jt->second->getName() + ", got type " + it->second->getName(), ErrorCodes::TYPE_MISMATCH);
if (unique_names.end() != unique_names.find(name))
throw Exception("Column " + name + " queried more than once",
ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(name);
}
}
void ITableDeclaration::check(const Block & block, bool need_all) const
{
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
const auto columns_map = getColumnsMap(available_columns);
NameSet names_in_block;
block.checkNumberOfRows();
for (const auto & column : block)
{
if (names_in_block.count(column.name))
throw Exception("Duplicate column " + column.name + " in block",
ErrorCodes::DUPLICATE_COLUMN);
names_in_block.insert(column.name);
NamesAndTypesMap::const_iterator it = columns_map.find(column.name);
if (columns_map.end() == it)
throw Exception("There is no column with name " + column.name + ". There are columns: "
+ listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (!column.type->equals(*it->second))
throw Exception("Type mismatch for column " + column.name + ". Column has type "
+ it->second->getName() + ", got type " + column.type->getName(), ErrorCodes::TYPE_MISMATCH);
}
if (need_all && names_in_block.size() < columns_map.size())
{
for (NamesAndTypesList::const_iterator it = available_columns.begin(); it != available_columns.end(); ++it)
{
if (!names_in_block.count(it->name))
throw Exception("Expected column " + it->name, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
}
}
}
ITableDeclaration::ITableDeclaration(ColumnsDescription columns_)
{
setColumns(std::move(columns_));
}
}

View File

@ -1,67 +0,0 @@
#pragma once
#include <Storages/ColumnsDescription.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
namespace DB
{
/** Description of the table.
* Is not thread safe. See IStorage::lockStructure ().
*/
class ITableDeclaration
{
public:
const ColumnsDescription & getColumns() const;
void setColumns(ColumnsDescription columns_);
const IndicesDescription & getIndices() const;
void setIndices(IndicesDescription indices_);
virtual const ConstraintsDescription & getConstraints() const { return constraints; }
virtual void setConstraints(ConstraintsDescription constraints_);
/// NOTE: These methods should include virtual columns, but should NOT include ALIAS columns
/// (they are treated separately).
virtual NameAndTypePair getColumn(const String & column_name) const;
virtual bool hasColumn(const String & column_name) const;
Block getSampleBlock() const;
Block getSampleBlockNonMaterialized() const;
/// Including virtual and alias columns.
Block getSampleBlockForColumns(const Names & column_names) const;
/** Verify that all the requested names are in the table and are set correctly.
* (the list of names is not empty and the names do not repeat)
*/
void check(const Names & column_names) const;
/** Check that all the requested names are in the table and have the correct types.
*/
void check(const NamesAndTypesList & columns) const;
/** Check that all names from the intersection of `names` and `columns` are in the table and have the same types.
*/
void check(const NamesAndTypesList & columns, const Names & column_names) const;
/** Check that the data block contains all the columns of the table with the correct types,
* contains only the columns of the table, and all the columns are different.
* If need_all, checks that all the columns of the table are in the block.
*/
void check(const Block & block, bool need_all = false) const;
ITableDeclaration() = default;
explicit ITableDeclaration(ColumnsDescription columns_);
virtual ~ITableDeclaration() = default;
private:
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
};
}

View File

@ -23,7 +23,8 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \ M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \ M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \ M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
M(SettingUInt64, kafka_commit_every_batch, 1, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS) DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS)

View File

@ -40,7 +40,7 @@ void ReadBufferFromKafkaConsumer::unsubscribe()
/// Do commit messages implicitly after we processed the previous batch. /// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl() bool ReadBufferFromKafkaConsumer::nextImpl()
{ {
/// NOTE: ReadBuffer was implemented with a immutable buffer contents in mind. /// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again. /// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn. /// Otherwise, the |poll_timeout| expectations get flawn.
if (stalled) if (stalled)
@ -48,6 +48,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
if (current == messages.end()) if (current == messages.end())
{ {
if (intermediate_commit)
commit(); commit();
messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout)); messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
current = messages.begin(); current = messages.begin();

View File

@ -15,12 +15,14 @@ using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class ReadBufferFromKafkaConsumer : public ReadBuffer class ReadBufferFromKafkaConsumer : public ReadBuffer
{ {
public: public:
ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_) ReadBufferFromKafkaConsumer(
ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_)
: ReadBuffer(nullptr, 0) : ReadBuffer(nullptr, 0)
, consumer(consumer_) , consumer(consumer_)
, log(log_) , log(log_)
, batch_size(max_batch_size) , batch_size(max_batch_size)
, poll_timeout(poll_timeout_) , poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
, current(messages.begin()) , current(messages.begin())
{ {
} }
@ -39,6 +41,7 @@ private:
const size_t batch_size = 1; const size_t batch_size = 1;
const size_t poll_timeout = 0; const size_t poll_timeout = 0;
bool stalled = false; bool stalled = false;
bool intermediate_commit = true;
Messages messages; Messages messages;
Messages::const_iterator current; Messages::const_iterator current;

View File

@ -71,7 +71,8 @@ StorageKafka::StorageKafka(
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_, const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_, const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken_) size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken_,
bool intermediate_commit_)
: IStorage{columns_}, : IStorage{columns_},
table_name(table_name_), database_name(database_name_), global_context(context_), table_name(table_name_), database_name(database_name_), global_context(context_),
topics(global_context.getMacros()->expand(topics_)), topics(global_context.getMacros()->expand(topics_)),
@ -82,7 +83,7 @@ StorageKafka::StorageKafka(
schema_name(global_context.getMacros()->expand(schema_name_)), schema_name(global_context.getMacros()->expand(schema_name_)),
num_consumers(num_consumers_), max_block_size(max_block_size_), log(&Logger::get("StorageKafka (" + table_name_ + ")")), num_consumers(num_consumers_), max_block_size(max_block_size_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), semaphore(0, num_consumers_),
skip_broken(skip_broken_) skip_broken(skip_broken_), intermediate_commit(intermediate_commit_)
{ {
task = global_context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); }); task = global_context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); });
task->deactivate(); task->deactivate();
@ -212,7 +213,8 @@ BufferPtr StorageKafka::createBuffer()
batch_size = settings.max_block_size.value; batch_size = settings.max_block_size.value;
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds(); size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
return std::make_shared<DelimitedReadBuffer>(std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout), row_delimiter); return std::make_shared<DelimitedReadBuffer>(
std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit), row_delimiter);
} }
BufferPtr StorageKafka::claimBuffer() BufferPtr StorageKafka::claimBuffer()
@ -380,6 +382,7 @@ void registerStorageKafka(StorageFactory & factory)
* - Number of consumers * - Number of consumers
* - Max block size for background consumption * - Max block size for background consumption
* - Skip (at least) unreadable messages number * - Skip (at least) unreadable messages number
* - Do intermediate commits when the batch consumed and handled
*/ */
// Check arguments and settings // Check arguments and settings
@ -414,6 +417,8 @@ void registerStorageKafka(StorageFactory & factory)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers) CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size) CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size)
CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages) CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages)
CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch)
#undef CHECK_KAFKA_STORAGE_ARGUMENT #undef CHECK_KAFKA_STORAGE_ARGUMENT
// Get and check broker list // Get and check broker list
@ -598,9 +603,27 @@ void registerStorageKafka(StorageFactory & factory)
skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages.value); skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages.value);
} }
bool intermediate_commit = true;
if (args_count >= 10)
{
const auto * ast = engine_args[9]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
intermediate_commit = static_cast<bool>(safeGet<UInt64>(ast->value));
}
else
{
throw Exception("Flag for committing every batch must be 0 or 1", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_commit_every_batch.changed)
{
intermediate_commit = static_cast<bool>(kafka_settings.kafka_commit_every_batch);
}
return StorageKafka::create( return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns, args.table_name, args.database_name, args.context, args.columns,
brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken); brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit);
}); });
} }

View File

@ -77,6 +77,8 @@ private:
size_t skip_broken; size_t skip_broken;
bool intermediate_commit;
// Stream thread // Stream thread
BackgroundSchedulePool::TaskHolder task; BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false}; std::atomic<bool> stream_cancelled{false};
@ -99,7 +101,8 @@ protected:
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_, const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_, const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken); size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken,
bool intermediate_commit_);
}; };
} }

View File

@ -2,7 +2,7 @@
#include <Interpreters/InterserverIOHandler.h> #include <Interpreters/InterserverIOHandler.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/IStorage.h> #include <Storages/IStorage_fwd.h>
#include <IO/HashingWriteBuffer.h> #include <IO/HashingWriteBuffer.h>
#include <IO/copyData.h> #include <IO/copyData.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>

View File

@ -1,6 +1,8 @@
#pragma once #pragma once
#include <Parsers/ASTAlterQuery.h> #include <Parsers/ASTAlterQuery.h>
#include <Storages/IStorage_fwd.h>
#include <optional> #include <optional>
#include <unordered_map> #include <unordered_map>
@ -8,7 +10,6 @@
namespace DB namespace DB
{ {
class IStorage;
class Context; class Context;
class WriteBuffer; class WriteBuffer;
class ReadBuffer; class ReadBuffer;

View File

@ -1,16 +1,17 @@
#pragma once #pragma once
#include <Core/Types.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <Core/Types.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <vector> #include <Storages/IStorage_fwd.h>
#include <optional> #include <optional>
#include <vector>
namespace DB namespace DB
{ {
class IStorage;
class ASTAlterCommand; class ASTAlterCommand;
struct PartitionCommand struct PartitionCommand

View File

@ -1,7 +1,9 @@
#pragma once #pragma once
#include <Common/NamePrompter.h> #include <Common/NamePrompter.h>
#include <Storages/IStorage.h> #include <Parsers/IAST_fwd.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IStorage_fwd.h>
#include <ext/singleton.h> #include <ext/singleton.h>
#include <unordered_map> #include <unordered_map>

View File

@ -11,6 +11,8 @@
#include <Interpreters/InterpreterRenameQuery.h> #include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h> #include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h> #include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>

View File

@ -57,18 +57,26 @@ StorageMerge::StorageMerge(
} }
/// NOTE Structure of underlying tables as well as their set are not constant, /// NOTE: structure of underlying tables as well as their set are not constant,
/// so the results of these methods may become obsolete after the call. /// so the results of these methods may become obsolete after the call.
bool StorageMerge::isVirtualColumn(const String & column_name) const
{
if (column_name != "_table")
return false;
return !IStorage::hasColumn(column_name);
}
NameAndTypePair StorageMerge::getColumn(const String & column_name) const NameAndTypePair StorageMerge::getColumn(const String & column_name) const
{ {
if (IStorage::hasColumn(column_name))
return IStorage::getColumn(column_name);
/// virtual column of the Merge table itself /// virtual column of the Merge table itself
if (column_name == "_table") if (column_name == "_table")
return { column_name, std::make_shared<DataTypeString>() }; return { column_name, std::make_shared<DataTypeString>() };
if (IStorage::hasColumn(column_name))
return IStorage::getColumn(column_name);
/// virtual (and real) columns of the underlying tables /// virtual (and real) columns of the underlying tables
auto first_table = getFirstTable([](auto &&) { return true; }); auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table) if (first_table)
@ -188,7 +196,7 @@ BlockInputStreams StorageMerge::read(
for (const auto & column_name : column_names) for (const auto & column_name : column_names)
{ {
if (column_name == "_table") if (isVirtualColumn(column_name))
has_table_virtual_column = true; has_table_virtual_column = true;
else else
real_column_names.push_back(column_name); real_column_names.push_back(column_name);

View File

@ -84,6 +84,8 @@ protected:
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query, void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage); BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage);
bool isVirtualColumn(const String & column_name) const override;
}; };
} }

View File

@ -259,7 +259,7 @@ NameAndTypePair StorageSystemPartsBase::getColumn(const String & column_name) co
if (column_name == "_state") if (column_name == "_state")
return NameAndTypePair("_state", std::make_shared<DataTypeString>()); return NameAndTypePair("_state", std::make_shared<DataTypeString>());
return ITableDeclaration::getColumn(column_name); return IStorage::getColumn(column_name);
} }
bool StorageSystemPartsBase::hasColumn(const String & column_name) const bool StorageSystemPartsBase::hasColumn(const String & column_name) const
@ -267,7 +267,7 @@ bool StorageSystemPartsBase::hasColumn(const String & column_name) const
if (column_name == "_state") if (column_name == "_state")
return true; return true;
return ITableDeclaration::hasColumn(column_name); return IStorage::hasColumn(column_name);
} }
StorageSystemPartsBase::StorageSystemPartsBase(std::string name_, NamesAndTypesList && columns_) StorageSystemPartsBase::StorageSystemPartsBase(std::string name_, NamesAndTypesList && columns_)

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <ext/shared_ptr_helper.h> #include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/StorageSystemPartsBase.h> #include <Storages/System/StorageSystemPartsBase.h>

View File

@ -0,0 +1,75 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Databases/DatabaseMemory.h>
#include <Storages/StorageMemory.h>
#include <Functions/registerFunctions.h>
using namespace DB;
/// NOTE How to do better?
struct State
{
Context context{Context::createGlobal()};
NamesAndTypesList columns{{"column", std::make_shared<DataTypeUInt8>()}};
State()
{
registerFunctions();
DatabasePtr database = std::make_shared<DatabaseMemory>("test");
database->attachTable("table", StorageMemory::create("table", ColumnsDescription{columns}));
context.addDatabase("test", database);
context.setCurrentDatabase("test");
}
};
State & state()
{
static State res;
return res;
}
void check(const std::string & query, const std::string & expected, const Context & context, const NamesAndTypesList & columns)
{
ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, query, 1000);
std::string transformed_query = transformQueryForExternalDatabase(*ast, columns, IdentifierQuotingStyle::DoubleQuotes, "test", "table", context);
EXPECT_EQ(transformed_query, expected);
}
TEST(TransformQueryForExternalDatabase, InWithSingleElement)
{
check("SELECT column FROM test.table WHERE 1 IN (1)",
"SELECT \"column\" FROM \"test\".\"table\" WHERE 1 IN (1)",
state().context, state().columns);
check("SELECT column FROM test.table WHERE column IN (1, 2)",
"SELECT \"column\" FROM \"test\".\"table\" WHERE \"column\" IN (1, 2)",
state().context, state().columns);
check("SELECT column FROM test.table WHERE column NOT IN ('hello', 'world')",
"SELECT \"column\" FROM \"test\".\"table\" WHERE \"column\" NOT IN ('hello', 'world')",
state().context, state().columns);
}
TEST(TransformQueryForExternalDatabase, Like)
{
check("SELECT column FROM test.table WHERE column LIKE '%hello%'",
"SELECT \"column\" FROM \"test\".\"table\" WHERE \"column\" LIKE '%hello%'",
state().context, state().columns);
check("SELECT column FROM test.table WHERE column NOT LIKE 'w%rld'",
"SELECT \"column\" FROM \"test\".\"table\" WHERE \"column\" NOT LIKE 'w%rld'",
state().context, state().columns);
}

Some files were not shown because too many files have changed in this diff Show More