Merge branch 'master' of github.com:ClickHouse/ClickHouse into cache-dictionary

This commit is contained in:
Nikita Mikhaylov 2019-12-26 12:01:43 +03:00
commit ea6c894656
205 changed files with 1794 additions and 1216 deletions

2
contrib/libcxxabi vendored

@ -1 +1 @@
Subproject commit c26cf36f8387c5edf2cabb4a630f0975c35aa9fb
Subproject commit 7aacd45028ecf5f1c39985ecbd4f67eed9b11ce5

View File

@ -186,8 +186,8 @@ endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionHelpers.cpp)
list (APPEND dbms_headers src/Functions/IFunctionImpl.h src/Functions/FunctionFactory.h src/Functions/FunctionHelpers.h)
list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionHelpers.cpp src/Functions/extractTimeZoneFromFunctionArguments.cpp)
list (APPEND dbms_headers src/Functions/IFunctionImpl.h src/Functions/FunctionFactory.h src/Functions/FunctionHelpers.h src/Functions/extractTimeZoneFromFunctionArguments.h)
list (APPEND dbms_sources
src/AggregateFunctions/AggregateFunctionFactory.cpp

View File

@ -1,5 +1,6 @@
<yandex>
<logger>
<console>true</console>
<log remove="remove"/>
<errorlog remove="remove"/>
</logger>

View File

@ -9,6 +9,7 @@
#include <ext/scope_guard.h>
#include <boost/range/algorithm/find.hpp>
#include <boost/range/algorithm/find_first_of.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <ifaddrs.h>
@ -23,29 +24,64 @@ namespace ErrorCodes
namespace
{
using IPAddress = Poco::Net::IPAddress;
using IPSubnet = AllowedClientHosts::IPSubnet;
const IPSubnet ALL_ADDRESSES{IPAddress{IPAddress::IPv6}, IPAddress{IPAddress::IPv6}};
const AllowedClientHosts::IPSubnet ALL_ADDRESSES = AllowedClientHosts::IPSubnet{IPAddress{IPAddress::IPv6}, IPAddress{IPAddress::IPv6}};
IPAddress toIPv6(const IPAddress & addr)
const IPAddress & getIPV6Loopback()
{
if (addr.family() == IPAddress::IPv6)
return addr;
if (addr.isLoopback())
return IPAddress("::1");
return IPAddress("::FFFF:" + addr.toString());
static const IPAddress ip("::1");
return ip;
}
IPAddress maskToIPv6(const IPAddress & mask)
bool isIPV4LoopbackMappedToIPV6(const IPAddress & ip)
{
if (mask.family() == IPAddress::IPv6)
return mask;
return IPAddress(96, IPAddress::IPv6) | toIPv6(mask);
static const IPAddress prefix("::ffff:127.0.0.0");
/// 104 == 128 - 24, we have to reset the lowest 24 bits of 128 before comparing with `prefix`
/// (IPv4 loopback means any IP from 127.0.0.0 to 127.255.255.255).
return (ip & IPAddress(104, IPAddress::IPv6)) == prefix;
}
/// Converts an address to IPv6.
/// The loopback address "127.0.0.1" (or any "127.x.y.z") is converted to "::1".
IPAddress toIPv6(const IPAddress & ip)
{
IPAddress v6;
if (ip.family() == IPAddress::IPv6)
v6 = ip;
else
v6 = IPAddress("::ffff:" + ip.toString());
// ::ffff:127.XX.XX.XX -> ::1
if (isIPV4LoopbackMappedToIPV6(v6))
v6 = getIPV6Loopback();
return v6;
}
/// Converts a subnet to IPv6.
IPSubnet toIPv6(const IPSubnet & subnet)
{
IPSubnet v6;
if (subnet.prefix.family() == IPAddress::IPv6)
v6.prefix = subnet.prefix;
else
v6.prefix = IPAddress("::ffff:" + subnet.prefix.toString());
if (subnet.mask.family() == IPAddress::IPv6)
v6.mask = subnet.mask;
else
v6.mask = IPAddress(96, IPAddress::IPv6) | IPAddress("::ffff:" + subnet.mask.toString());
v6.prefix = v6.prefix & v6.mask;
// ::ffff:127.XX.XX.XX -> ::1
if (isIPV4LoopbackMappedToIPV6(v6.prefix))
v6 = {getIPV6Loopback(), IPAddress(128, IPAddress::IPv6)};
return v6;
}
/// Helper function for isAddressOfHost().
bool isAddressOfHostImpl(const IPAddress & address, const String & host)
{
IPAddress addr_v6 = toIPv6(address);
@ -93,15 +129,15 @@ namespace
return false;
}
/// Cached version of isAddressOfHostImpl(). We need to cache DNS requests.
/// Whether a specified address is one of the addresses of a specified host.
bool isAddressOfHost(const IPAddress & address, const String & host)
{
/// We need to cache DNS requests.
static SimpleCache<decltype(isAddressOfHostImpl), isAddressOfHostImpl> cache;
return cache(address, host);
}
/// Helper function for isAddressOfLocalhost().
std::vector<IPAddress> getAddressesOfLocalhostImpl()
{
std::vector<IPAddress> addresses;
@ -114,7 +150,7 @@ namespace
int err = getifaddrs(&ifa_begin);
if (err)
return {IPAddress{"127.0.0.1"}, IPAddress{"::1"}};
return {getIPV6Loopback()};
for (const ifaddrs * ifa = ifa_begin; ifa; ifa = ifa->ifa_next)
{
@ -134,15 +170,15 @@ namespace
return addresses;
}
/// Checks if a specified address pointers to the localhost.
bool isLocalAddress(const IPAddress & address)
/// Whether a specified address is one of the addresses of the localhost.
bool isAddressOfLocalhost(const IPAddress & address)
{
/// We need to cache DNS requests.
static const std::vector<IPAddress> local_addresses = getAddressesOfLocalhostImpl();
return boost::range::find(local_addresses, address) != local_addresses.end();
return boost::range::find(local_addresses, toIPv6(address)) != local_addresses.end();
}
/// Helper function for getHostByAddress().
String getHostByAddressImpl(const IPAddress & address)
{
Poco::Net::SocketAddress sock_addr(address, 0);
@ -160,10 +196,10 @@ namespace
return host;
}
/// Cached version of getHostByAddressImpl(). We need to cache DNS requests.
/// Returns the host name by its address.
String getHostByAddress(const IPAddress & address)
{
/// We need to cache DNS requests.
static SimpleCache<decltype(getHostByAddressImpl), &getHostByAddressImpl> cache;
return cache(address);
}
@ -203,7 +239,7 @@ AllowedClientHosts::AllowedClientHosts(const AllowedClientHosts & src)
AllowedClientHosts & AllowedClientHosts::operator =(const AllowedClientHosts & src)
{
addresses = src.addresses;
loopback = src.loopback;
localhost = src.localhost;
subnets = src.subnets;
host_names = src.host_names;
host_regexps = src.host_regexps;
@ -212,28 +248,14 @@ AllowedClientHosts & AllowedClientHosts::operator =(const AllowedClientHosts & s
}
AllowedClientHosts::AllowedClientHosts(AllowedClientHosts && src)
{
*this = src;
}
AllowedClientHosts & AllowedClientHosts::operator =(AllowedClientHosts && src)
{
addresses = std::move(src.addresses);
loopback = src.loopback;
subnets = std::move(src.subnets);
host_names = std::move(src.host_names);
host_regexps = std::move(src.host_regexps);
compiled_host_regexps = std::move(src.compiled_host_regexps);
return *this;
}
AllowedClientHosts::AllowedClientHosts(AllowedClientHosts && src) = default;
AllowedClientHosts & AllowedClientHosts::operator =(AllowedClientHosts && src) = default;
void AllowedClientHosts::clear()
{
addresses.clear();
loopback = false;
localhost = false;
subnets.clear();
host_names.clear();
host_regexps.clear();
@ -250,10 +272,11 @@ bool AllowedClientHosts::empty() const
void AllowedClientHosts::addAddress(const IPAddress & address)
{
IPAddress addr_v6 = toIPv6(address);
if (boost::range::find(addresses, addr_v6) == addresses.end())
addresses.push_back(addr_v6);
if (boost::range::find(addresses, addr_v6) != addresses.end())
return;
addresses.push_back(addr_v6);
if (addr_v6.isLoopback())
loopback = true;
localhost = true;
}
@ -265,9 +288,7 @@ void AllowedClientHosts::addAddress(const String & address)
void AllowedClientHosts::addSubnet(const IPSubnet & subnet)
{
IPSubnet subnet_v6;
subnet_v6.prefix = toIPv6(subnet.prefix);
subnet_v6.mask = maskToIPv6(subnet.mask);
IPSubnet subnet_v6 = toIPv6(subnet);
if (subnet_v6.mask == IPAddress(128, IPAddress::IPv6))
{
@ -275,8 +296,6 @@ void AllowedClientHosts::addSubnet(const IPSubnet & subnet)
return;
}
subnet_v6.prefix = subnet_v6.prefix & subnet_v6.mask;
if (boost::range::find(subnets, subnet_v6) == subnets.end())
subnets.push_back(subnet_v6);
}
@ -314,8 +333,11 @@ void AllowedClientHosts::addSubnet(const String & subnet)
void AllowedClientHosts::addHostName(const String & host_name)
{
if (boost::range::find(host_names, host_name) == host_names.end())
host_names.push_back(host_name);
if (boost::range::find(host_names, host_name) != host_names.end())
return;
host_names.push_back(host_name);
if (boost::iequals(host_name, "localhost"))
localhost = true;
}
@ -360,7 +382,7 @@ bool AllowedClientHosts::contains(const IPAddress & address) const
if (boost::range::find(addresses, addr_v6) != addresses.end())
return true;
if (loopback && isLocalAddress(addr_v6))
if (localhost && isAddressOfLocalhost(addr_v6))
return true;
/// Check `ip_subnets`.

View File

@ -94,7 +94,7 @@ private:
void compileRegexps() const;
std::vector<IPAddress> addresses;
bool loopback = false;
bool localhost = false;
std::vector<IPSubnet> subnets;
std::vector<String> host_names;
std::vector<String> host_regexps;

View File

@ -84,8 +84,6 @@ public:
{
this->data(place).result.insertResultInto(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -130,8 +130,6 @@ public:
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -87,8 +87,6 @@ public:
column.getData().push_back(this->data(place).template result<ResultType>());
}
const char * getHeaderFilePath() const override { return __FILE__; }
protected:
UInt32 scale;
};

View File

@ -78,8 +78,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).value);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -154,11 +154,6 @@ public:
{
assert_cast<ColumnFloat64 &>(to).getData().push_back(getBoundingRatio(data(place)));
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -33,11 +33,6 @@ public:
return "categoricalInformationValue";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
void create(AggregateDataPtr place) const override
{
memset(place, 0, sizeOfData());

View File

@ -63,8 +63,6 @@ public:
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
const char * getHeaderFilePath() const override { return __FILE__; }
/// Reset the state to specified value. This function is not the part of common interface.
void set(AggregateDataPtr place, UInt64 new_count)
{
@ -115,8 +113,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -145,8 +145,6 @@ public:
auto & column = assert_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(this->data(place).get());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -247,8 +247,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -136,8 +136,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -400,8 +398,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE

View File

@ -203,8 +203,6 @@ public:
to_offsets.push_back(to_offsets.back() + result_array_size);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -192,8 +192,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
#undef AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE

View File

@ -52,8 +52,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -119,8 +117,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
template <typename Data>

View File

@ -118,8 +118,6 @@ public:
for (auto it = set.begin(); it != set.end(); ++it, ++i)
data_to[old_size + i] = it->getValue();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -255,8 +253,6 @@ public:
deserializeAndInsert(elem.getValue(), data_to);
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
template <>

View File

@ -369,8 +369,6 @@ public:
offsets_to.push_back(to_tuple.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
String getName() const override { return "histogram"; }
};

View File

@ -109,8 +109,6 @@ public:
{
return nested_func->isState();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -394,8 +394,6 @@ public:
this->data(place).returnWeights(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt64 param_num;
Float64 learning_rate;

View File

@ -162,11 +162,6 @@ public:
result_column.push_back(position_of_max_intersections);
}
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -98,8 +98,6 @@ public:
{
return nested_func->allocatesMemoryInArena();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -731,8 +731,6 @@ public:
{
this->data(place).insertResultInto(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -71,8 +71,6 @@ public:
{
to.insertDefault();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -176,8 +176,6 @@ public:
{
return nested_function->isState();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -49,11 +49,6 @@ public:
return nested_function->getName() + "OrDefault";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();

View File

@ -179,8 +179,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
static void assertSecondArg(const DataTypes & types)
{
if constexpr (has_second_arg)

View File

@ -72,11 +72,6 @@ public:
return nested_function->getName() + "Resample";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();

View File

@ -144,11 +144,6 @@ public:
offsets_to.push_back(current_offset);
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -180,8 +180,6 @@ public:
this->data(place).deserialize(buf);
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
enum class PatternActionType
{

View File

@ -109,11 +109,6 @@ public:
return "simpleLinearRegression";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
void add(
AggregateDataPtr place,
const IColumn ** columns,

View File

@ -94,8 +94,6 @@ public:
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -147,8 +147,6 @@ public:
{
this->data(place).publish(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
/** Implementing the varSamp function.
@ -401,8 +399,6 @@ public:
{
this->data(place).publish(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
/** Implementing the covarSamp function.

View File

@ -552,8 +552,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt32 src_scale;
};

View File

@ -147,8 +147,6 @@ public:
column.getData().push_back(this->data(place).get());
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt32 scale;
};

View File

@ -261,8 +261,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
bool keepKey(const T & key) const { return static_cast<const Derived &>(*this).keepKey(key); }
};

View File

@ -281,7 +281,5 @@ public:
}
bool allocatesMemoryInArena() const override { return true; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -103,8 +103,6 @@ public:
for (auto it = result_vec.begin(); it != result_vec.end(); ++it, ++i)
data_to[old_size + i] = it->key;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -230,8 +228,6 @@ public:
data_to.deserializeAndInsertFromArena(elem.key.data);
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -244,8 +244,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -300,8 +298,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -109,7 +109,7 @@ struct AggregateFunctionUniqCombinedData : public AggregateFunctionUniqCombinedD
};
/// For String keys, 64 bit hash is always used (both for uniqCombined and uniqCombined64),
/// For String keys, 64 bit hash is always used (both for uniqCombined and uniqCombined64),
/// because of backwards compatibility (64 bit hash was already used for uniqCombined).
template <UInt8 K, typename HashValueType>
struct AggregateFunctionUniqCombinedData<String, K, HashValueType> : public AggregateFunctionUniqCombinedDataWithKey<UInt64 /*always*/, K>
@ -171,11 +171,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
/** For multiple arguments. To compute, hashes them.
@ -238,11 +233,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -184,8 +184,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -248,8 +246,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -245,11 +245,6 @@ public:
{
assert_cast<ColumnUInt8 &>(to).getData().push_back(getEventLevel(this->data(place)));
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -148,12 +148,6 @@ public:
addBatchArray(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
const = 0;
/** This is used for runtime code generation to determine, which header files to include in generated source.
* Always implement it as
* const char * getHeaderFilePath() const override { return __FILE__; }
*/
virtual const char * getHeaderFilePath() const = 0;
const DataTypes & getArgumentTypes() const { return argument_types; }
const Array & getParameters() const { return parameters; }

View File

@ -127,8 +127,8 @@ public:
uint64_t line = 0;
};
/**
* Find the file and line number information corresponding to address.
/** Find the file and line number information corresponding to address.
* The address must be physical - offset in object file without offset in virtual memory where the object is loaded.
*/
bool findAddress(uintptr_t address, LocationInfo & info, LocationInfoMode mode) const;

View File

@ -476,7 +476,8 @@ namespace ErrorCodes
extern const int S3_ERROR = 499;
extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA = 500;
extern const int CANNOT_CREATE_DATABASE = 501;
extern const int CACHE_DICTIONARY_UPDATE_FAIL = 502;
extern const int CANNOT_SIGQUEUE = 502;
extern const int CACHE_DICTIONARY_UPDATE_FAIL = 503;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

112
dbms/src/Common/PipeFDs.cpp Normal file
View File

@ -0,0 +1,112 @@
#include <Common/PipeFDs.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <common/logger_useful.h>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <algorithm>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PIPE;
extern const int CANNOT_FCNTL;
extern const int LOGICAL_ERROR;
}
void LazyPipeFDs::open()
{
for (int & fd : fds_rw)
if (fd >= 0)
throw Exception("Pipe is already opened", ErrorCodes::LOGICAL_ERROR);
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
#else
if (0 != pipe(fds_rw))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
throwFromErrno("Cannot setup auto-close on exec for read end of pipe", ErrorCodes::CANNOT_FCNTL);
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
throwFromErrno("Cannot setup auto-close on exec for write end of pipe", ErrorCodes::CANNOT_FCNTL);
#endif
}
void LazyPipeFDs::close()
{
for (int & fd : fds_rw)
{
if (fd < 0)
continue;
if (0 != ::close(fd))
throwFromErrno("Cannot close pipe", ErrorCodes::CANNOT_PIPE);
fd = -1;
}
}
PipeFDs::PipeFDs()
{
open();
}
LazyPipeFDs::~LazyPipeFDs()
{
try
{
close();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void LazyPipeFDs::setNonBlocking()
{
int flags = fcntl(fds_rw[1], F_GETFL, 0);
if (-1 == flags)
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
if (-1 == fcntl(fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
}
void LazyPipeFDs::tryIncreaseSize(int desired_size)
{
#if defined(OS_LINUX)
Poco::Logger * log = &Poco::Logger::get("Pipe");
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
int pipe_size = fcntl(fds_rw[1], F_GETPIPE_SZ);
if (-1 == pipe_size)
{
if (errno == EINVAL)
{
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
/// It will work nevertheless.
}
else
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
}
else
{
for (errno = 0; errno != EPERM && pipe_size < desired_size; pipe_size *= 2)
if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size)));
}
#else
(void)desired_size;
#endif
}
}

35
dbms/src/Common/PipeFDs.h Normal file
View File

@ -0,0 +1,35 @@
#pragma once
#include <cstddef>
namespace DB
{
/** Struct containing a pipe with lazy initialization.
* Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access
* pipe's file descriptors.
*/
struct LazyPipeFDs
{
int fds_rw[2] = {-1, -1};
void open();
void close();
void setNonBlocking();
void tryIncreaseSize(int desired_size);
~LazyPipeFDs();
};
/** Struct which opens new pipe on creation and closes it on destruction.
* Use `fds_rw` field to access pipe's file descriptors.
*/
struct PipeFDs : public LazyPipeFDs
{
PipeFDs();
};
}

View File

@ -1,12 +1,12 @@
#include "QueryProfiler.h"
#include <random>
#include <common/Pipe.h>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <Common/StackTrace.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/thread_local_rng.h>
@ -22,7 +22,7 @@ namespace ProfileEvents
namespace DB
{
extern LazyPipe trace_pipe;
extern LazyPipeFDs trace_pipe;
namespace
{

View File

@ -4,11 +4,11 @@
#include <dlfcn.h>
#include <Common/Exception.h>
#include <Common/ShellCommand.h>
#include <Common/PipeFDs.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include <port/unistd.h>
#include <csignal>
#include <common/Pipe.h>
namespace
{
@ -66,9 +66,9 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(const char * filename, c
if (!real_vfork)
throwFromErrno("Cannot find symbol vfork in myself", ErrorCodes::CANNOT_DLSYM);
Pipe pipe_stdin;
Pipe pipe_stdout;
Pipe pipe_stderr;
PipeFDs pipe_stdin;
PipeFDs pipe_stdout;
PipeFDs pipe_stderr;
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();

View File

@ -258,10 +258,14 @@ static void toStringEveryLineImpl(const StackTrace::Frames & frames, size_t offs
for (size_t i = offset; i < size; ++i)
{
const void * addr = frames[i];
const void * virtual_addr = frames[i];
auto object = symbol_index.findObject(virtual_addr);
uintptr_t virtual_offset = object ? uintptr_t(object->address_begin) : 0;
const void * physical_addr = reinterpret_cast<const void *>(uintptr_t(virtual_addr) - virtual_offset);
out << i << ". " << addr << " ";
auto symbol = symbol_index.findSymbol(addr);
out << i << ". " << physical_addr << " ";
auto symbol = symbol_index.findSymbol(virtual_addr);
if (symbol)
{
int status = 0;
@ -272,18 +276,17 @@ static void toStringEveryLineImpl(const StackTrace::Frames & frames, size_t offs
out << " ";
if (auto object = symbol_index.findObject(addr))
if (object)
{
if (std::filesystem::exists(object->name))
{
auto dwarf_it = dwarfs.try_emplace(object->name, *object->elf).first;
DB::Dwarf::LocationInfo location;
if (dwarf_it->second.findAddress(uintptr_t(addr) - uintptr_t(object->address_begin), location, DB::Dwarf::LocationInfoMode::FAST))
if (dwarf_it->second.findAddress(uintptr_t(physical_addr), location, DB::Dwarf::LocationInfoMode::FAST))
out << location.file.toString() << ":" << location.line;
else
out << object->name;
}
out << " in " << object->name;
}
else
out << "?";

View File

@ -38,6 +38,7 @@ public:
std::unique_ptr<Elf> elf;
};
/// Address in virtual memory should be passed. These addresses include offset where the object is loaded in memory.
const Symbol * findSymbol(const void * address) const;
const Object * findObject(const void * address) const;

View File

@ -24,7 +24,7 @@ public:
/// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info
static bool checkPermissions();
#if defined(__linux__)
#if defined(OS_LINUX)
private:
int netlink_socket_fd = -1;
UInt16 taskstats_family_id = 0;

View File

@ -2,7 +2,7 @@
#include <Core/Field.h>
#include <Poco/Logger.h>
#include <common/Pipe.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
@ -19,13 +19,12 @@
namespace DB
{
LazyPipe trace_pipe;
LazyPipeFDs trace_pipe;
namespace ErrorCodes
{
extern const int NULL_POINTER_DEREFERENCE;
extern const int THREAD_IS_NOT_JOINABLE;
extern const int CANNOT_FCNTL;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
@ -40,36 +39,8 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
int flags = fcntl(trace_pipe.fds_rw[1], F_GETFL, 0);
if (-1 == flags)
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
#if defined(OS_LINUX)
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
int pipe_size = fcntl(trace_pipe.fds_rw[1], F_GETPIPE_SZ);
if (-1 == pipe_size)
{
if (errno == EINVAL)
{
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
/// It will work nevertheless.
}
else
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
}
else
{
constexpr int max_pipe_capacity_to_set = 1048576;
for (errno = 0; errno != EPERM && pipe_size < max_pipe_capacity_to_set; pipe_size *= 2)
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
throwFromErrno("Cannot increase pipe capacity to " + toString(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, max_pipe_capacity_to_set)));
}
#endif
trace_pipe.setNonBlocking();
trace_pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}

View File

@ -49,7 +49,7 @@ int main(int argc, char ** argv)
Dwarf dwarf(*object->elf);
Dwarf::LocationInfo location;
if (dwarf.findAddress(uintptr_t(address), location, Dwarf::LocationInfoMode::FAST))
if (dwarf.findAddress(uintptr_t(address) - uintptr_t(info.dli_fbase), location, Dwarf::LocationInfoMode::FAST))
std::cerr << location.file.toString() << ":" << location.line << "\n";
else
std::cerr << "Dwarf: Not found\n";

View File

@ -386,6 +386,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \
M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \
M(SettingUInt64, mutations_sync, 0, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -65,7 +65,7 @@ void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_n
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
auto parser = std::make_unique<InputStreamFromInputFormat>(
input_processor_creator(read_buffer, header, context,
input_processor_creator(read_buffer, header,
row_input_format_params, format_settings));
unit.block_ext.block.clear();

View File

@ -55,23 +55,21 @@ private:
using InputProcessorCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const Context & context,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
public:
struct InputCreatorParams
{
const Block &sample;
const Context &context;
const RowInputFormatParams& row_input_format_params;
const Block & sample;
const RowInputFormatParams & row_input_format_params;
const FormatSettings &settings;
};
struct Params
{
ReadBuffer & read_buffer;
const InputProcessorCreator &input_processor_creator;
const InputCreatorParams &input_creator_params;
const InputProcessorCreator & input_processor_creator;
const InputCreatorParams & input_creator_params;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
int max_threads;
size_t min_chunk_bytes;
@ -79,7 +77,6 @@ public:
explicit ParallelParsingBlockInputStream(const Params & params)
: header(params.input_creator_params.sample),
context(params.input_creator_params.context),
row_input_format_params(params.input_creator_params.row_input_format_params),
format_settings(params.input_creator_params.settings),
input_processor_creator(params.input_processor_creator),
@ -149,7 +146,6 @@ protected:
private:
const Block header;
const Context context;
const RowInputFormatParams row_input_format_params;
const FormatSettings format_settings;
const InputProcessorCreator input_processor_creator;

View File

@ -344,16 +344,8 @@ void DatabaseOrdinary::alterTable(
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
if (ast_create_query.columns_list->indices)
ast_create_query.columns_list->replace(ast_create_query.columns_list->indices, new_indices);
else
ast_create_query.columns_list->set(ast_create_query.columns_list->indices, new_indices);
if (ast_create_query.columns_list->constraints)
ast_create_query.columns_list->replace(ast_create_query.columns_list->constraints, new_constraints);
else
ast_create_query.columns_list->set(ast_create_query.columns_list->constraints, new_constraints);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
if (storage_modifier)
storage_modifier(*ast_create_query.storage);

View File

@ -76,6 +76,9 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {});
/// Processors are not supported here yet.
context.getSettingsRef().experimental_use_processors = false;
/// Query context is needed because some code in executeQuery function may assume it exists.
/// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
context.makeQueryContext();
}
@ -100,6 +103,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, load_all_query{other.load_all_query}
{
context.makeQueryContext();
}
std::string ClickHouseDictionarySource::getUpdateFieldAndDate()

View File

@ -12,6 +12,8 @@
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
namespace DB
@ -34,7 +36,7 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
}
static FormatSettings getInputFormatSetting(const Settings & settings)
static FormatSettings getInputFormatSetting(const Settings & settings, const Context & context)
{
FormatSettings format_settings;
format_settings.csv.delimiter = settings.format_csv_delimiter;
@ -56,11 +58,21 @@ static FormatSettings getInputFormatSetting(const Settings & settings)
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
return format_settings;
}
static FormatSettings getOutputFormatSetting(const Settings & settings)
static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context)
{
FormatSettings format_settings;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
@ -77,6 +89,16 @@ static FormatSettings getOutputFormatSetting(const Settings & settings)
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
return format_settings;
}
@ -100,9 +122,9 @@ BlockInputStreamPtr FormatFactory::getInput(
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings);
FormatSettings format_settings = getInputFormatSetting(settings, context);
return input_getter(buf, sample, context, max_block_size, callback ? callback : ReadCallback(), format_settings);
return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings);
}
const Settings & settings = context.getSettingsRef();
@ -118,7 +140,7 @@ BlockInputStreamPtr FormatFactory::getInput(
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
FormatSettings format_settings = getInputFormatSetting(settings);
FormatSettings format_settings = getInputFormatSetting(settings, context);
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
@ -128,7 +150,7 @@ BlockInputStreamPtr FormatFactory::getInput(
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings};
auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Params params{buf, input_getter,
input_creator_params, file_segmentation_engine,
static_cast<int>(settings.max_threads),
@ -164,16 +186,16 @@ BlockOutputStreamPtr FormatFactory::getOutput(
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings);
FormatSettings format_settings = getOutputFormatSetting(settings, context);
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, context, callback, format_settings), sample);
output_getter(buf, sample, std::move(callback), format_settings), sample);
}
auto format = getOutputFormat(name, buf, sample, context, callback);
auto format = getOutputFormat(name, buf, sample, context, std::move(callback));
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
@ -191,7 +213,7 @@ InputFormatPtr FormatFactory::getInputFormat(
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings);
FormatSettings format_settings = getInputFormatSetting(settings, context);
RowInputFormatParams params;
params.max_block_size = max_block_size;
@ -201,7 +223,13 @@ InputFormatPtr FormatFactory::getInputFormat(
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;
return input_getter(buf, sample, context, params, format_settings);
auto format = input_getter(buf, sample, params, format_settings);
/// It's a kludge. Because I cannot remove context from values format.
if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get()))
values->setContext(context);
return format;
}
@ -213,12 +241,18 @@ OutputFormatPtr FormatFactory::getOutputFormat(
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings);
FormatSettings format_settings = getOutputFormatSetting(settings, context);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return output_getter(buf, sample, context, callback, format_settings);
auto format = output_getter(buf, sample, std::move(callback), format_settings);
/// It's a kludge. Because I cannot remove context from MySQL format.
if (auto * mysql = typeid_cast<MySQLOutputFormat *>(format.get()))
mysql->setContext(context);
return format;
}
@ -259,7 +293,7 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm
auto & target = dict[name].file_segmentation_engine;
if (target)
throw Exception("FormatFactory: File segmentation engine " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = file_segmentation_engine;
target = std::move(file_segmentation_engine);
}
FormatFactory::FormatFactory()

View File

@ -59,7 +59,6 @@ private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback,
const FormatSettings & settings)>;
@ -67,21 +66,18 @@ private:
using OutputCreator = std::function<BlockOutputStreamPtr(
WriteBuffer & buf,
const Block & sample,
const Context & context,
WriteCallback callback,
const FormatSettings & settings)>;
using InputProcessorCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const Context & context,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
using OutputProcessorCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
const Context & context,
WriteCallback callback,
const FormatSettings & settings)>;

View File

@ -26,7 +26,7 @@ namespace
}
FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & format_schema, const String & format, bool require_message)
FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path)
{
if (format_schema.empty())
throw Exception(
@ -54,29 +54,25 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & forma
else
path.assign(format_schema).makeFile().getFileName();
auto default_schema_directory = [&context]()
auto default_schema_directory = [&format_schema_path]()
{
static const String str = Poco::Path(context.getFormatSchemaPath()).makeAbsolute().makeDirectory().toString();
static const String str = Poco::Path(format_schema_path).makeAbsolute().makeDirectory().toString();
return str;
};
auto is_server = [&context]()
{
return context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
};
if (path.getExtension().empty() && !default_file_extension.empty())
path.setExtension(default_file_extension);
if (path.isAbsolute())
{
if (is_server())
if (is_server)
throw Exception("Absolute path in the 'format_schema' setting is prohibited: " + path.toString(), ErrorCodes::BAD_ARGUMENTS);
schema_path = path.getFileName();
schema_directory = path.makeParent().toString();
}
else if (path.depth() >= 1 && path.directory(0) == "..")
{
if (is_server())
if (is_server)
throw Exception(
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: " + path.toString(),
ErrorCodes::BAD_ARGUMENTS);

View File

@ -10,7 +10,7 @@ class Context;
class FormatSchemaInfo
{
public:
FormatSchemaInfo(const Context & context, const String & format_schema, const String & format, bool require_message);
FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path);
/// Returns path to the schema file.
const String & schemaPath() const { return schema_path; }

View File

@ -89,6 +89,27 @@ struct FormatSettings
UInt64 row_group_size = 1000000;
} parquet;
struct Schema
{
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
};
Schema schema;
struct Custom
{
std::string result_before_delimiter;
std::string result_after_delimiter;
std::string row_before_delimiter;
std::string row_after_delimiter;
std::string row_between_delimiter;
std::string field_delimiter;
std::string escaping_rule;
};
Custom custom;
};
}

View File

@ -11,7 +11,6 @@ void registerInputFormatNative(FormatFactory & factory)
factory.registerInputFormat("Native", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
UInt64 /* max_block_size */,
FormatFactory::ReadCallback /* callback */,
const FormatSettings &)
@ -25,7 +24,6 @@ void registerOutputFormatNative(FormatFactory & factory)
factory.registerOutputFormat("Native", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{

View File

@ -10,7 +10,6 @@ void registerOutputFormatNull(FormatFactory & factory)
factory.registerOutputFormat("Null", [](
WriteBuffer &,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{

View File

@ -234,36 +234,32 @@ void ParsedTemplateFormatString::throwInvalidFormat(const String & message, size
ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
ParsedTemplateFormatString ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(const Context & context)
ParsedTemplateFormatString ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(const FormatSettings::Custom & settings)
{
const Settings & settings = context.getSettingsRef();
/// Set resultset format to "result_before_delimiter ${data} result_after_delimiter"
ParsedTemplateFormatString resultset_format;
resultset_format.delimiters.emplace_back(settings.format_custom_result_before_delimiter);
resultset_format.delimiters.emplace_back(settings.format_custom_result_after_delimiter);
resultset_format.delimiters.emplace_back(settings.result_before_delimiter);
resultset_format.delimiters.emplace_back(settings.result_after_delimiter);
resultset_format.formats.emplace_back(ParsedTemplateFormatString::ColumnFormat::None);
resultset_format.format_idx_to_column_idx.emplace_back(0);
resultset_format.column_names.emplace_back("data");
return resultset_format;
}
ParsedTemplateFormatString ParsedTemplateFormatString::setupCustomSeparatedRowFormat(const Context & context, const Block & sample)
ParsedTemplateFormatString ParsedTemplateFormatString::setupCustomSeparatedRowFormat(const FormatSettings::Custom & settings, const Block & sample)
{
const Settings & settings = context.getSettingsRef();
/// Set row format to
/// "row_before_delimiter ${Col0:escaping} field_delimiter ${Col1:escaping} field_delimiter ... ${ColN:escaping} row_after_delimiter"
ParsedTemplateFormatString::ColumnFormat escaping = ParsedTemplateFormatString::stringToFormat(settings.format_custom_escaping_rule);
ParsedTemplateFormatString::ColumnFormat escaping = ParsedTemplateFormatString::stringToFormat(settings.escaping_rule);
ParsedTemplateFormatString row_format;
row_format.delimiters.emplace_back(settings.format_custom_row_before_delimiter);
row_format.delimiters.emplace_back(settings.row_before_delimiter);
for (size_t i = 0; i < sample.columns(); ++i)
{
row_format.formats.emplace_back(escaping);
row_format.format_idx_to_column_idx.emplace_back(i);
row_format.column_names.emplace_back(sample.getByPosition(i).name);
bool last_column = i == sample.columns() - 1;
row_format.delimiters.emplace_back(last_column ? settings.format_custom_row_after_delimiter : settings.format_custom_field_delimiter);
row_format.delimiters.emplace_back(last_column ? settings.row_after_delimiter : settings.field_delimiter);
}
return row_format;
}

View File

@ -4,6 +4,7 @@
#include <functional>
#include <optional>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
namespace DB
{
@ -49,8 +50,8 @@ struct ParsedTemplateFormatString
String dump() const;
[[noreturn]] void throwInvalidFormat(const String & message, size_t column) const;
static ParsedTemplateFormatString setupCustomSeparatedResultsetFormat(const Context & context);
static ParsedTemplateFormatString setupCustomSeparatedRowFormat(const Context & context, const Block & sample);
static ParsedTemplateFormatString setupCustomSeparatedResultsetFormat(const FormatSettings::Custom & settings);
static ParsedTemplateFormatString setupCustomSeparatedRowFormat(const FormatSettings::Custom & settings, const Block & sample);
};
}

View File

@ -5,29 +5,6 @@
namespace DB
{
void throwExceptionForIncompletelyParsedValue(
ReadBuffer & read_buffer, Block & block, size_t result)
{
const IDataType & to_type = *block.getByPosition(result).type;
WriteBufferFromOwnString message_buf;
message_buf << "Cannot parse string " << quote << String(read_buffer.buffer().begin(), read_buffer.buffer().size())
<< " as " << to_type.getName()
<< ": syntax error";
if (read_buffer.offset())
message_buf << " at position " << read_buffer.offset()
<< " (parsed just " << quote << String(read_buffer.buffer().begin(), read_buffer.offset()) << ")";
else
message_buf << " at begin of string";
if (isNativeNumber(to_type))
message_buf << ". Note: there are to" << to_type.getName() << "OrZero and to" << to_type.getName() << "OrNull functions, which returns zero/NULL instead of throwing exception.";
throw Exception(message_buf.str(), ErrorCodes::CANNOT_PARSE_TEXT);
}
void registerFunctionsConversion(FunctionFactory & factory)
{
factory.registerFunction<FunctionToUInt8>();

View File

@ -501,7 +501,26 @@ inline bool tryParseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, Read
/** Throw exception with verbose message when string value is not parsed completely.
*/
[[noreturn]] void throwExceptionForIncompletelyParsedValue(ReadBuffer & read_buffer, Block & block, size_t result);
[[noreturn]] inline void throwExceptionForIncompletelyParsedValue(ReadBuffer & read_buffer, Block & block, size_t result)
{
const IDataType & to_type = *block.getByPosition(result).type;
WriteBufferFromOwnString message_buf;
message_buf << "Cannot parse string " << quote << String(read_buffer.buffer().begin(), read_buffer.buffer().size())
<< " as " << to_type.getName()
<< ": syntax error";
if (read_buffer.offset())
message_buf << " at position " << read_buffer.offset()
<< " (parsed just " << quote << String(read_buffer.buffer().begin(), read_buffer.offset()) << ")";
else
message_buf << " at begin of string";
if (isNativeNumber(to_type))
message_buf << ". Note: there are to" << to_type.getName() << "OrZero and to" << to_type.getName() << "OrNull functions, which returns zero/NULL instead of throwing exception.";
throw Exception(message_buf.str(), ErrorCodes::CANNOT_PARSE_TEXT);
}
enum class ConvertFromStringExceptionMode
@ -886,6 +905,7 @@ public:
static constexpr bool to_datetime64 = std::is_same_v<ToDataType, DataTypeDateTime64>;
static FunctionPtr create(const Context &) { return std::make_shared<FunctionConvert>(); }
static FunctionPtr create() { return std::make_shared<FunctionConvert>(); }
String getName() const override
{
@ -1083,6 +1103,7 @@ public:
std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>;
static FunctionPtr create(const Context &) { return std::make_shared<FunctionConvertFromString>(); }
static FunctionPtr create() { return std::make_shared<FunctionConvertFromString>(); }
String getName() const override
{
@ -1231,6 +1252,7 @@ class FunctionToFixedString : public IFunction
public:
static constexpr auto name = "toFixedString";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionToFixedString>(); }
static FunctionPtr create() { return std::make_shared<FunctionToFixedString>(); }
String getName() const override
{
@ -1686,9 +1708,9 @@ public:
using WrapperType = std::function<void(Block &, const ColumnNumbers &, size_t, size_t)>;
using MonotonicityForRange = std::function<Monotonicity(const IDataType &, const Field &, const Field &)>;
FunctionCast(const Context & context_, const char * name_, MonotonicityForRange && monotonicity_for_range_
FunctionCast(const char * name_, MonotonicityForRange && monotonicity_for_range_
, const DataTypes & argument_types_, const DataTypePtr & return_type_)
: context(context_), name(name_), monotonicity_for_range(monotonicity_for_range_)
: name(name_), monotonicity_for_range(monotonicity_for_range_)
, argument_types(argument_types_), return_type(return_type_)
{
}
@ -1719,7 +1741,6 @@ public:
private:
const Context & context;
const char * name;
MonotonicityForRange monotonicity_for_range;
@ -1735,10 +1756,10 @@ private:
{
/// In case when converting to Nullable type, we apply different parsing rule,
/// that will not throw an exception but return NULL in case of malformed input.
function = FunctionConvertFromString<DataType, NameCast, ConvertFromStringExceptionMode::Null>::create(context);
function = FunctionConvertFromString<DataType, NameCast, ConvertFromStringExceptionMode::Null>::create();
}
else
function = FunctionTo<DataType>::Type::create(context);
function = FunctionTo<DataType>::Type::create();
auto function_adaptor =
FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(function))
@ -1752,7 +1773,7 @@ private:
WrapperType createStringWrapper(const DataTypePtr & from_type) const
{
FunctionPtr function = FunctionToString::create(context);
FunctionPtr function = FunctionToString::create();
auto function_adaptor =
FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(function))
@ -1780,7 +1801,7 @@ private:
if (requested_result_is_nullable)
throw Exception{"CAST AS Nullable(UUID) is not implemented", ErrorCodes::NOT_IMPLEMENTED};
FunctionPtr function = FunctionTo<DataTypeUUID>::Type::create(context);
FunctionPtr function = FunctionTo<DataTypeUUID>::Type::create();
auto function_adaptor =
FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(function))
@ -1985,7 +2006,7 @@ private:
return createStringToEnumWrapper<ColumnFixedString, EnumType>();
else if (isNativeNumber(from_type) || isEnum(from_type))
{
auto function = Function::create(context);
auto function = Function::create();
auto func_or_adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(function))
.build(ColumnsWithTypeAndName{{nullptr, from_type, "" }});
@ -2337,9 +2358,10 @@ public:
using MonotonicityForRange = FunctionCast::MonotonicityForRange;
static constexpr auto name = "CAST";
static FunctionOverloadResolverImplPtr create(const Context & context) { return std::make_unique<CastOverloadResolver>(context); }
static FunctionOverloadResolverImplPtr create(const Context &) { return createImpl(); }
static FunctionOverloadResolverImplPtr createImpl() { return std::make_unique<CastOverloadResolver>(); }
CastOverloadResolver(const Context & context_) : context(context_) {}
CastOverloadResolver() {}
String getName() const override { return name; }
@ -2357,7 +2379,7 @@ protected:
data_types[i] = arguments[i].type;
auto monotonicity = getMonotonicityInformation(arguments.front().type, return_type.get());
return std::make_unique<FunctionCast>(context, name, std::move(monotonicity), data_types, return_type);
return std::make_unique<FunctionCast>(name, std::move(monotonicity), data_types, return_type);
}
DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments) const override
@ -2418,8 +2440,6 @@ private:
/// other types like Null, FixedString, Array and Tuple have no monotonicity defined
return {};
}
const Context & context;
};
}

View File

@ -83,6 +83,10 @@ public:
{
abort();
}
else if (mode == "std::terminate")
{
std::terminate();
}
else if (mode == "use after free")
{
int * x_ptr;

View File

@ -46,7 +46,14 @@ BrotliWriteBuffer::BrotliWriteBuffer(WriteBuffer & out_, int compression_level,
BrotliWriteBuffer::~BrotliWriteBuffer()
{
finish();
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void BrotliWriteBuffer::nextImpl()

View File

@ -8,10 +8,6 @@ WriteBufferFromFileBase::WriteBufferFromFileBase(size_t buf_size, char * existin
{
}
WriteBufferFromFileBase::~WriteBufferFromFileBase()
{
}
off_t WriteBufferFromFileBase::seek(off_t off, int whence)
{
return doSeek(off, whence);

View File

@ -13,12 +13,12 @@ class WriteBufferFromFileBase : public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
virtual ~WriteBufferFromFileBase();
~WriteBufferFromFileBase() override = default;
off_t seek(off_t off, int whence = SEEK_SET);
void truncate(off_t length = 0);
virtual off_t getPositionInFile() = 0;
virtual void sync() = 0;
void sync() override = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;

View File

@ -89,8 +89,14 @@ public:
~WriteBufferFromVector() override
{
if (!is_finished)
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
};

View File

@ -133,4 +133,17 @@ void WriteBufferValidUTF8::finish()
putReplacement();
}
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -35,10 +35,7 @@ public:
const char * replacement_ = "\xEF\xBF\xBD",
size_t size = DEFAULT_SIZE);
virtual ~WriteBufferValidUTF8() override
{
finish();
}
~WriteBufferValidUTF8() override;
};
}

View File

@ -98,14 +98,6 @@ NameSet AnalyzedJoin::getQualifiedColumnsSet() const
return out;
}
NameSet AnalyzedJoin::getOriginalColumnsSet() const
{
NameSet out;
for (const auto & names : original_names)
out.insert(names.second);
return out;
}
NamesWithAliases AnalyzedJoin::getNamesWithAliases(const NameSet & required_columns) const
{
NamesWithAliases out;

View File

@ -96,7 +96,6 @@ public:
bool hasOn() const { return table_join.on_expression != nullptr; }
NameSet getQualifiedColumnsSet() const;
NameSet getOriginalColumnsSet() const;
NamesWithAliases getNamesWithAliases(const NameSet & required_columns) const;
NamesWithAliases getRequiredColumns(const Block & sample, const Names & action_columns) const;

View File

@ -170,11 +170,22 @@ size_t CollectJoinOnKeysMatcher::getTableForIdentifiers(std::vector<const ASTIde
if (!membership)
{
const String & name = identifier->name;
bool in_left_table = data.source_columns.count(name);
bool in_right_table = data.joined_columns.count(name);
bool in_left_table = data.left_table.hasColumn(name);
bool in_right_table = data.right_table.hasColumn(name);
if (in_left_table && in_right_table)
throw Exception("Column '" + name + "' is ambiguous", ErrorCodes::AMBIGUOUS_COLUMN_NAME);
{
/// Relax ambiguous check for multiple JOINs
if (auto original_name = IdentifierSemantic::uncover(*identifier))
{
auto match = IdentifierSemantic::canReferColumnToTable(*original_name, data.right_table.table);
if (match == IdentifierSemantic::ColumnMatch::NoMatch)
in_right_table = false;
in_left_table = !in_right_table;
}
else
throw Exception("Column '" + name + "' is ambiguous", ErrorCodes::AMBIGUOUS_COLUMN_NAME);
}
if (in_left_table)
membership = 1;

View File

@ -3,6 +3,7 @@
#include <Core/Names.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/Aliases.h>
@ -25,8 +26,8 @@ public:
struct Data
{
AnalyzedJoin & analyzed_join;
const NameSet & source_columns;
const NameSet & joined_columns;
const TableWithColumnNames & left_table;
const TableWithColumnNames & right_table;
const Aliases & aliases;
const bool is_asof{false};
ASTPtr asof_left_key{};

View File

@ -53,6 +53,20 @@ struct TableWithColumnNames
for (auto & column : addition)
hidden_columns.push_back(column.name);
}
bool hasColumn(const String & name) const
{
if (columns_set.empty())
{
columns_set.insert(columns.begin(), columns.end());
columns_set.insert(hidden_columns.begin(), hidden_columns.end());
}
return columns_set.count(name);
}
private:
mutable NameSet columns_set;
};
std::vector<DatabaseAndTableWithAlias> getDatabaseAndTables(const ASTSelectQuery & select_query, const String & current_database);

View File

@ -92,6 +92,22 @@ std::optional<String> IdentifierSemantic::getTableName(const ASTPtr & ast)
return {};
}
std::optional<ASTIdentifier> IdentifierSemantic::uncover(const ASTIdentifier & identifier)
{
if (identifier.semantic->covered)
{
std::vector<String> name_parts = identifier.name_parts;
return ASTIdentifier(std::move(name_parts));
}
return {};
}
void IdentifierSemantic::coverName(ASTIdentifier & identifier, const String & alias)
{
identifier.setShortName(alias);
identifier.semantic->covered = true;
}
bool IdentifierSemantic::canBeAlias(const ASTIdentifier & identifier)
{
return identifier.semantic->can_be_alias;

View File

@ -12,6 +12,7 @@ struct IdentifierSemanticImpl
{
bool special = false; /// for now it's 'not a column': tables, subselects and some special stuff like FORMAT
bool can_be_alias = true; /// if it's a cropped name it could not be an alias
bool covered = false; /// real (compound) name is hidden by an alias (short name)
std::optional<size_t> membership; /// table position in join
};
@ -43,6 +44,8 @@ struct IdentifierSemantic
static void setColumnLongName(ASTIdentifier & identifier, const DatabaseAndTableWithAlias & db_and_table);
static bool canBeAlias(const ASTIdentifier & identifier);
static void setMembership(ASTIdentifier &, size_t table_no);
static void coverName(ASTIdentifier &, const String & alias);
static std::optional<ASTIdentifier> uncover(const ASTIdentifier & identifier);
static std::optional<size_t> getMembership(const ASTIdentifier & identifier);
static bool chooseTable(const ASTIdentifier &, const std::vector<DatabaseAndTableWithAlias> & tables, size_t & best_table_pos,
bool ambiguous = false);

View File

@ -21,7 +21,6 @@
#include <Parsers/parseQuery.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
@ -36,7 +35,6 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
@ -97,29 +95,20 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
throw Exception("Database " + database_name + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
}
String database_engine_name;
if (!create.storage)
{
database_engine_name = "Ordinary"; /// Default database engine.
auto engine = std::make_shared<ASTFunction>();
engine->name = database_engine_name;
auto storage = std::make_shared<ASTStorage>();
engine->name = "Ordinary";
storage->set(storage->engine, engine);
create.set(create.storage, storage);
}
else
else if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
{
const ASTStorage & storage = *create.storage;
const ASTFunction & engine = *storage.engine;
/// Currently, there are no database engines, that support any arguments.
if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
{
std::stringstream ostr;
formatAST(storage, ostr, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
database_engine_name = engine.name;
std::stringstream ostr;
formatAST(*create.storage, ostr, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
String database_name_escaped = escapeForFileName(database_name);
@ -155,19 +144,27 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
out.close();
}
bool added = false;
bool renamed = false;
try
{
context.addDatabase(database_name, database);
added = true;
if (need_write_metadata)
{
Poco::File(metadata_file_tmp_path).renameTo(metadata_file_path);
renamed = true;
}
database->loadStoredObjects(context, has_force_restore_data_flag);
}
catch (...)
{
if (need_write_metadata)
if (renamed)
Poco::File(metadata_file_tmp_path).remove();
if (added)
context.detachDatabase(database_name);
throw;
}
@ -396,84 +393,97 @@ ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const A
}
ColumnsDescription InterpreterCreateQuery::setProperties(
ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const
InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(ASTCreateQuery & create) const
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
TableProperties properties;
TableStructureReadLockHolder as_storage_lock;
if (create.columns_list)
{
if (create.columns_list->columns)
columns = getColumnsDescription(*create.columns_list->columns, context);
properties.columns = getColumnsDescription(*create.columns_list->columns, context);
if (create.columns_list->indices)
for (const auto & index : create.columns_list->indices->children)
indices.indices.push_back(
properties.indices.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
if (create.columns_list->constraints)
for (const auto & constraint : create.columns_list->constraints->children)
constraints.constraints.push_back(
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
properties.constraints = getConstraintsDescription(create.columns_list->constraints);
}
else if (!create.as_table.empty())
{
columns = as_storage->getColumns();
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
StoragePtr as_storage = context.getTable(as_database_name, create.as_table);
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
properties.columns = as_storage->getColumns();
/// Secondary indices make sense only for MergeTree family of storage engines.
/// We should not copy them for other storages.
if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
indices = as_storage->getIndices();
properties.indices = as_storage->getIndices();
constraints = as_storage->getConstraints();
properties.constraints = as_storage->getConstraints();
}
else if (create.select)
{
columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
Block as_select_sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.select->clone(), context);
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
}
else if (create.as_table_function)
return {};
else
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
/// Even if query has list of columns, canonicalize it (unfold Nested columns).
ASTPtr new_columns = formatColumns(columns);
ASTPtr new_indices = formatIndices(indices);
ASTPtr new_constraints = formatConstraints(constraints);
if (!create.columns_list)
{
auto new_columns_list = std::make_shared<ASTColumns>();
create.set(create.columns_list, new_columns_list);
}
create.set(create.columns_list, std::make_shared<ASTColumns>());
if (create.columns_list->columns)
create.columns_list->replace(create.columns_list->columns, new_columns);
else
create.columns_list->set(create.columns_list->columns, new_columns);
ASTPtr new_columns = formatColumns(properties.columns);
ASTPtr new_indices = formatIndices(properties.indices);
ASTPtr new_constraints = formatConstraints(properties.constraints);
if (new_indices && create.columns_list->indices)
create.columns_list->replace(create.columns_list->indices, new_indices);
else if (new_indices)
create.columns_list->set(create.columns_list->indices, new_indices);
create.columns_list->setOrReplace(create.columns_list->columns, new_columns);
create.columns_list->setOrReplace(create.columns_list->indices, new_indices);
create.columns_list->setOrReplace(create.columns_list->constraints, new_constraints);
if (new_constraints && create.columns_list->constraints)
create.columns_list->replace(create.columns_list->constraints, new_constraints);
else if (new_constraints)
create.columns_list->set(create.columns_list->constraints, new_constraints);
validateTableStructure(create, properties);
/// Set the table engine if it was not specified explicitly.
setEngine(create);
return properties;
}
void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties) const
{
/// Check for duplicates
std::set<String> all_columns;
for (const auto & column : columns)
for (const auto & column : properties.columns)
{
if (!all_columns.emplace(column.name).second)
throw Exception("Column " + backQuoteIfNeed(column.name) + " already exists", ErrorCodes::DUPLICATE_COLUMN);
}
return columns;
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
{
for (const auto & name_and_type_pair : properties.columns.getAllPhysical())
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default "
"due to expected negative impact on performance. "
"It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.storage)
@ -535,23 +545,19 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
throw Exception("Temporary tables cannot be inside a database. You should not specify a database for a temporary table.",
ErrorCodes::BAD_DATABASE_FOR_TEMPORARY_TABLE);
String path = context.getPath();
String current_database = context.getCurrentDatabase();
String database_name = create.database.empty() ? current_database : create.database;
String table_name = create.table;
String table_name_escaped = escapeForFileName(table_name);
// If this is a stub ATTACH query, read the query definition from the database
if (create.attach && !create.storage && !create.columns_list)
{
// Table SQL definition is available even if the table is detached
auto query = context.getCreateTableQuery(database_name, table_name);
auto query = context.getCreateTableQuery(create.database, create.table);
create = query->as<ASTCreateQuery &>(); // Copy the saved create query, but use ATTACH instead of CREATE
create.attach = true;
}
if (create.to_database.empty())
String current_database = context.getCurrentDatabase();
if (!create.temporary && create.database.empty())
create.database = current_database;
if (!create.to_table.empty() && create.to_database.empty())
create.to_database = current_database;
if (create.select && (create.is_view || create.is_materialized_view || create.is_live_view))
@ -560,26 +566,63 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
visitor.visit(*create.select);
}
Block as_select_sample;
if (create.select && (!create.attach || !create.columns_list))
as_select_sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.select->clone(), context);
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = setProperties(create);
String as_database_name = create.as_database.empty() ? current_database : create.as_database;
String as_table_name = create.as_table;
/// Actually creates table
bool created = doCreateTable(create, properties);
StoragePtr as_storage;
TableStructureReadLockHolder as_storage_lock;
if (!created) /// Table already exists
return {};
if (!as_table_name.empty())
return fillTableIfNeeded(create);
}
bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties)
{
std::unique_ptr<DDLGuard> guard;
String data_path;
DatabasePtr database;
const String & table_name = create.table;
bool need_add_to_database = !create.temporary || create.is_live_view;
if (need_add_to_database)
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
database = context.getDatabase(create.database);
data_path = database->getDataPath();
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuard(create.database, table_name);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
/// TODO Check structure of table
if (create.if_not_exists)
return false;
else if (create.replace_view)
{
/// when executing CREATE OR REPLACE VIEW, drop current existing view
auto drop_ast = std::make_shared<ASTDropQuery>();
drop_ast->database = create.database;
drop_ast->table = table_name;
drop_ast->no_ddl_lock = true;
InterpreterDropQuery interpreter(drop_ast, context);
interpreter.execute();
}
else
throw Exception("Table " + create.database + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return false;
ColumnsDescription columns;
ConstraintsDescription constraints;
StoragePtr res;
if (create.as_table_function)
{
const auto & table_function = create.as_table_function->as<ASTFunction &>();
@ -588,99 +631,38 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
}
else
{
/// Set and retrieve list of columns.
columns = setProperties(create, as_select_sample, as_storage);
constraints = getConstraintsDescription(create.columns_list->constraints);
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
{
for (const auto & name_and_type_pair : columns.getAllPhysical())
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default due to expected negative impact on performance. It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
/// Set the table engine if it was not specified explicitly.
setEngine(create);
res = StorageFactory::instance().get(create,
data_path,
table_name,
create.database,
context,
context.getGlobalContext(),
properties.columns,
properties.constraints,
create.attach,
false);
}
{
std::unique_ptr<DDLGuard> guard;
if (need_add_to_database)
database->createTable(context, table_name, res, query_ptr);
else
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
String data_path;
DatabasePtr database;
/// We must call "startup" and "shutdown" while holding DDLGuard.
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
/// (in case when table was created and instantly dropped before started up)
///
/// Method "startup" may create background tasks and method "shutdown" will wait for them.
/// But if "shutdown" is called before "startup", it will exit early, because there are no background tasks to wait.
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
/// and the task will use references to freed data.
if (!create.temporary || create.is_live_view)
{
database = context.getDatabase(database_name);
data_path = database->getDataPath();
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuard(database_name, table_name);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
/// TODO Check structure of table
if (create.if_not_exists)
return {};
else if (create.replace_view)
{
/// when executing CREATE OR REPLACE VIEW, drop current existing view
auto drop_ast = std::make_shared<ASTDropQuery>();
drop_ast->database = database_name;
drop_ast->table = table_name;
drop_ast->no_ddl_lock = true;
InterpreterDropQuery interpreter(drop_ast, context);
interpreter.execute();
}
else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};
if (!create.as_table_function)
{
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
constraints,
create.attach,
false);
}
if (create.temporary && !create.is_live_view)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
else
database->createTable(context, table_name, res, query_ptr);
/// We must call "startup" and "shutdown" while holding DDLGuard.
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
/// (in case when table was created and instantly dropped before started up)
///
/// Method "startup" may create background tasks and method "shutdown" will wait for them.
/// But if "shutdown" is called before "startup", it will exit early, because there are no background tasks to wait.
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
/// and the task will use references to freed data.
res->startup();
}
res->startup();
return true;
}
BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
{
/// If the query is a CREATE SELECT, insert the data into the table.
if (create.select && !create.attach
&& !create.is_view && !create.is_live_view && (!create.is_materialized_view || create.is_populate))
@ -688,9 +670,9 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
auto insert = std::make_shared<ASTInsertQuery>();
if (!create.temporary)
insert->database = database_name;
insert->database = create.database;
insert->table = table_name;
insert->table = create.table;
insert->select = create.select->clone();
if (create.temporary && !context.getSessionContext().hasQueryContext())

View File

@ -49,15 +49,28 @@ public:
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
private:
struct TableProperties
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
};
BlockIO createDatabase(ASTCreateQuery & create);
BlockIO createTable(ASTCreateQuery & create);
BlockIO createDictionary(ASTCreateQuery & create);
/// Calculate list of columns, constraints, indices, etc... of table and return columns.
ColumnsDescription setProperties(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties setProperties(ASTCreateQuery & create) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
void setEngine(ASTCreateQuery & create) const;
void checkAccess(const ASTCreateQuery & create);
/// Create IStorage and add it to database. If table already exists and IF NOT EXISTS specified, do nothing and return false.
bool doCreateTable(const ASTCreateQuery & create, const TableProperties & properties);
/// Inserts data in created table if it's CREATE ... SELECT
BlockIO fillTableIfNeeded(const ASTCreateQuery & create);
ASTPtr query_ptr;
Context & context;

View File

@ -159,7 +159,7 @@ struct ColumnAliasesMatcher
aliases[alias] = long_name;
rev_aliases[long_name].push_back(alias);
identifier->setShortName(alias);
IdentifierSemantic::coverName(*identifier, alias);
if (is_public)
{
identifier->setAlias(long_name);
@ -177,7 +177,7 @@ struct ColumnAliasesMatcher
if (is_public && allowed_long_names.count(long_name))
; /// leave original name unchanged for correct output
else
identifier->setShortName(it->second[0]);
IdentifierSemantic::coverName(*identifier, it->second[0]);
}
}
}
@ -229,7 +229,7 @@ struct ColumnAliasesMatcher
if (!last_table)
{
node.setShortName(alias);
IdentifierSemantic::coverName(node, alias);
node.setAlias("");
}
}

View File

@ -339,6 +339,8 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
affected_materialized.emplace(mat_column);
}
/// Just to be sure, that we don't change type
/// after update expression execution.
const auto & update_expr = kv.second;
auto updated_column = makeASTFunction("CAST",
makeASTFunction("if",

View File

@ -65,8 +65,9 @@ private:
/// Each stage has output_columns that contain columns that are changed at the end of that stage
/// plus columns needed for the next mutations.
///
/// First stage is special: it can contain only DELETEs and is executed using InterpreterSelectQuery
/// to take advantage of table indexes (if there are any).
/// First stage is special: it can contain only filters and is executed using InterpreterSelectQuery
/// to take advantage of table indexes (if there are any). It's necessary because all mutations have
/// `WHERE clause` part.
struct Stage
{
@ -83,7 +84,7 @@ private:
/// A chain of actions needed to execute this stage.
/// First steps calculate filter columns for DELETEs (in the same order as in `filter_column_names`),
/// then there is (possibly) an UPDATE stage, and finally a projection stage.
/// then there is (possibly) an UPDATE step, and finally a projection step.
ExpressionActionsChain expressions_chain;
Names filter_column_names;
};

View File

@ -532,8 +532,8 @@ void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_defaul
}
/// Find the columns that are obtained by JOIN.
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query, const NameSet & source_columns,
const Aliases & aliases)
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query,
const std::vector<TableWithColumnNames> & tables, const Aliases & aliases)
{
const ASTTablesInSelectQueryElement * node = select_query.join();
if (!node)
@ -551,7 +551,7 @@ void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & s
{
bool is_asof = (table_join.strictness == ASTTableJoin::Strictness::Asof);
CollectJoinOnKeysVisitor::Data data{analyzed_join, source_columns, analyzed_join.getOriginalColumnsSet(), aliases, is_asof};
CollectJoinOnKeysVisitor::Data data{analyzed_join, tables[0], tables[1], aliases, is_asof};
CollectJoinOnKeysVisitor(data).visit(table_join.on_expression);
if (!data.has_some)
throw Exception("Cannot get JOIN keys from JOIN ON section: " + queryToString(table_join.on_expression),
@ -820,6 +820,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
if (storage)
collectSourceColumns(storage->getColumns(), result.source_columns, (select_query != nullptr));
NameSet source_columns_set = removeDuplicateColumns(result.source_columns);
std::vector<TableWithColumnNames> tables_with_columns;
if (select_query)
{
@ -837,7 +838,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
}
std::vector<const ASTTableExpression *> table_expressions = getTableExpressions(*select_query);
auto tables_with_columns = getTablesWithColumns(table_expressions, context);
tables_with_columns = getTablesWithColumns(table_expressions, context);
if (tables_with_columns.empty())
{
@ -935,7 +936,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
setJoinStrictness(*select_query, settings.join_default_strictness, settings.any_join_distinct_right_table_keys,
result.analyzed_join->table_join);
collectJoinedColumns(*result.analyzed_join, *select_query, source_columns_set, result.aliases);
collectJoinedColumns(*result.analyzed_join, *select_query, tables_with_columns, result.aliases);
}
result.aggregates = getAggregates(query);

View File

@ -197,6 +197,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below.
memory_tracker.setParent(nullptr);
query_id.clear();
query_context = nullptr;
thread_group.reset();

View File

@ -2,13 +2,14 @@
#include <Interpreters/castColumn.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsConversion.h>
namespace DB
{
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const Context & context)
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
{
if (arg.type->equals(*type))
return arg.column;
@ -28,7 +29,7 @@ ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type
}
};
FunctionOverloadResolverPtr func_builder_cast = FunctionFactory::instance().get("CAST", context);
FunctionOverloadResolverPtr func_builder_cast = std::make_shared<FunctionOverloadResolverAdaptor>(CastOverloadResolver::createImpl());
ColumnsWithTypeAndName arguments{ temporary_block.getByPosition(0), temporary_block.getByPosition(1) };
auto func_cast = func_builder_cast->build(arguments);
@ -37,4 +38,9 @@ ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type
return temporary_block.getByPosition(2).column;
}
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const Context &)
{
return castColumn(arg, type);
}
}

View File

@ -6,7 +6,7 @@
namespace DB
{
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type);
ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const Context & context);
}

View File

@ -146,6 +146,15 @@ public:
throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR);
}
template <typename T>
void setOrReplace(T * & field, const ASTPtr & child)
{
if (field)
replace(field, child);
else
set(field, child);
}
/// Convert to a string.
/// Format settings.

View File

@ -248,7 +248,7 @@ namespace DB
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk &res, std::shared_ptr<arrow::Table> &table,
arrow::Status &read_status, const Block &header,
int &row_group_current, const Context &context, std::string format_name)
int &row_group_current, std::string format_name)
{
Columns columns_list;
UInt64 num_rows = 0;
@ -389,7 +389,7 @@ namespace DB
else
column.column = std::move(read_column);
column.column = castColumn(column, column_type, context);
column.column = castColumn(column, column_type);
column.type = column_type;
num_rows = column.column->size();
columns_list.push_back(std::move(column.column));

View File

@ -39,7 +39,7 @@ namespace DB
static void arrowTableToCHChunk(Chunk &res, std::shared_ptr<arrow::Table> &table,
arrow::Status &read_status, const Block &header,
int &row_group_current, const Context &context, std::string format_name);
int &row_group_current, std::string format_name);
};
}
#endif

View File

@ -61,7 +61,6 @@ void registerInputFormatProcessorRowBinary(FormatFactory & factory)
factory.registerInputFormatProcessor("RowBinary", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
const IRowInputFormat::Params & params,
const FormatSettings &)
{
@ -71,7 +70,6 @@ void registerInputFormatProcessorRowBinary(FormatFactory & factory)
factory.registerInputFormatProcessor("RowBinaryWithNamesAndTypes", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
const IRowInputFormat::Params & params,
const FormatSettings &)
{

View File

@ -52,7 +52,6 @@ void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
factory.registerOutputFormatProcessor("RowBinary", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
@ -62,7 +61,6 @@ void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{

Some files were not shown because too many files have changed in this diff Show More