Merge branch 'master' into parquet_list_reading_fix

This commit is contained in:
Maxim Ulanovskiy 2019-12-25 04:09:39 +03:00
commit b43ea04548
103 changed files with 869 additions and 733 deletions

2
contrib/libcxxabi vendored

@ -1 +1 @@
Subproject commit c26cf36f8387c5edf2cabb4a630f0975c35aa9fb
Subproject commit 7aacd45028ecf5f1c39985ecbd4f67eed9b11ce5

View File

@ -84,8 +84,6 @@ public:
{
this->data(place).result.insertResultInto(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -130,8 +130,6 @@ public:
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -87,8 +87,6 @@ public:
column.getData().push_back(this->data(place).template result<ResultType>());
}
const char * getHeaderFilePath() const override { return __FILE__; }
protected:
UInt32 scale;
};

View File

@ -78,8 +78,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).value);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -154,11 +154,6 @@ public:
{
assert_cast<ColumnFloat64 &>(to).getData().push_back(getBoundingRatio(data(place)));
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -33,11 +33,6 @@ public:
return "categoricalInformationValue";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
void create(AggregateDataPtr place) const override
{
memset(place, 0, sizeOfData());

View File

@ -63,8 +63,6 @@ public:
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
const char * getHeaderFilePath() const override { return __FILE__; }
/// Reset the state to specified value. This function is not the part of common interface.
void set(AggregateDataPtr place, UInt64 new_count)
{
@ -115,8 +113,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -145,8 +145,6 @@ public:
auto & column = assert_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(this->data(place).get());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -247,8 +247,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -136,8 +136,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -400,8 +398,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE

View File

@ -203,8 +203,6 @@ public:
to_offsets.push_back(to_offsets.back() + result_array_size);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -192,8 +192,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
#undef AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE

View File

@ -52,8 +52,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -119,8 +117,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
template <typename Data>

View File

@ -118,8 +118,6 @@ public:
for (auto it = set.begin(); it != set.end(); ++it, ++i)
data_to[old_size + i] = it->getValue();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -255,8 +253,6 @@ public:
deserializeAndInsert(elem.getValue(), data_to);
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
template <>

View File

@ -369,8 +369,6 @@ public:
offsets_to.push_back(to_tuple.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
String getName() const override { return "histogram"; }
};

View File

@ -109,8 +109,6 @@ public:
{
return nested_func->isState();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -394,8 +394,6 @@ public:
this->data(place).returnWeights(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt64 param_num;
Float64 learning_rate;

View File

@ -162,11 +162,6 @@ public:
result_column.push_back(position_of_max_intersections);
}
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -98,8 +98,6 @@ public:
{
return nested_func->allocatesMemoryInArena();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -731,8 +731,6 @@ public:
{
this->data(place).insertResultInto(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -71,8 +71,6 @@ public:
{
to.insertDefault();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -176,8 +176,6 @@ public:
{
return nested_function->isState();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -49,11 +49,6 @@ public:
return nested_function->getName() + "OrDefault";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();

View File

@ -179,8 +179,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
static void assertSecondArg(const DataTypes & types)
{
if constexpr (has_second_arg)

View File

@ -72,11 +72,6 @@ public:
return nested_function->getName() + "Resample";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();

View File

@ -144,11 +144,6 @@ public:
offsets_to.push_back(current_offset);
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -180,8 +180,6 @@ public:
this->data(place).deserialize(buf);
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
enum class PatternActionType
{

View File

@ -109,11 +109,6 @@ public:
return "simpleLinearRegression";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
void add(
AggregateDataPtr place,
const IColumn ** columns,

View File

@ -94,8 +94,6 @@ public:
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -147,8 +147,6 @@ public:
{
this->data(place).publish(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
/** Implementing the varSamp function.
@ -401,8 +399,6 @@ public:
{
this->data(place).publish(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
/** Implementing the covarSamp function.

View File

@ -552,8 +552,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt32 src_scale;
};

View File

@ -147,8 +147,6 @@ public:
column.getData().push_back(this->data(place).get());
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt32 scale;
};

View File

@ -261,8 +261,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
bool keepKey(const T & key) const { return static_cast<const Derived &>(*this).keepKey(key); }
};

View File

@ -281,7 +281,5 @@ public:
}
bool allocatesMemoryInArena() const override { return true; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -103,8 +103,6 @@ public:
for (auto it = result_vec.begin(); it != result_vec.end(); ++it, ++i)
data_to[old_size + i] = it->key;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -230,8 +228,6 @@ public:
data_to.deserializeAndInsertFromArena(elem.key.data);
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -244,8 +244,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -300,8 +298,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -109,7 +109,7 @@ struct AggregateFunctionUniqCombinedData : public AggregateFunctionUniqCombinedD
};
/// For String keys, 64 bit hash is always used (both for uniqCombined and uniqCombined64),
/// For String keys, 64 bit hash is always used (both for uniqCombined and uniqCombined64),
/// because of backwards compatibility (64 bit hash was already used for uniqCombined).
template <UInt8 K, typename HashValueType>
struct AggregateFunctionUniqCombinedData<String, K, HashValueType> : public AggregateFunctionUniqCombinedDataWithKey<UInt64 /*always*/, K>
@ -171,11 +171,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
/** For multiple arguments. To compute, hashes them.
@ -238,11 +233,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -184,8 +184,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -248,8 +246,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -245,11 +245,6 @@ public:
{
assert_cast<ColumnUInt8 &>(to).getData().push_back(getEventLevel(this->data(place)));
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -148,12 +148,6 @@ public:
addBatchArray(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
const = 0;
/** This is used for runtime code generation to determine, which header files to include in generated source.
* Always implement it as
* const char * getHeaderFilePath() const override { return __FILE__; }
*/
virtual const char * getHeaderFilePath() const = 0;
const DataTypes & getArgumentTypes() const { return argument_types; }
const Array & getParameters() const { return parameters; }

View File

@ -476,6 +476,7 @@ namespace ErrorCodes
extern const int S3_ERROR = 499;
extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA = 500;
extern const int CANNOT_CREATE_DATABASE = 501;
extern const int CANNOT_SIGQUEUE = 502;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

112
dbms/src/Common/PipeFDs.cpp Normal file
View File

@ -0,0 +1,112 @@
#include <Common/PipeFDs.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <common/logger_useful.h>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <algorithm>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PIPE;
extern const int CANNOT_FCNTL;
extern const int LOGICAL_ERROR;
}
void LazyPipeFDs::open()
{
for (int & fd : fds_rw)
if (fd >= 0)
throw Exception("Pipe is already opened", ErrorCodes::LOGICAL_ERROR);
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
#else
if (0 != pipe(fds_rw))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
throwFromErrno("Cannot setup auto-close on exec for read end of pipe", ErrorCodes::CANNOT_FCNTL);
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
throwFromErrno("Cannot setup auto-close on exec for write end of pipe", ErrorCodes::CANNOT_FCNTL);
#endif
}
void LazyPipeFDs::close()
{
for (int & fd : fds_rw)
{
if (fd < 0)
continue;
if (0 != ::close(fd))
throwFromErrno("Cannot close pipe", ErrorCodes::CANNOT_PIPE);
fd = -1;
}
}
PipeFDs::PipeFDs()
{
open();
}
LazyPipeFDs::~LazyPipeFDs()
{
try
{
close();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void LazyPipeFDs::setNonBlocking()
{
int flags = fcntl(fds_rw[1], F_GETFL, 0);
if (-1 == flags)
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
if (-1 == fcntl(fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
}
void LazyPipeFDs::tryIncreaseSize(int desired_size)
{
#if defined(OS_LINUX)
Poco::Logger * log = &Poco::Logger::get("Pipe");
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
int pipe_size = fcntl(fds_rw[1], F_GETPIPE_SZ);
if (-1 == pipe_size)
{
if (errno == EINVAL)
{
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
/// It will work nevertheless.
}
else
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
}
else
{
for (errno = 0; errno != EPERM && pipe_size < desired_size; pipe_size *= 2)
if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size)));
}
#else
(void)desired_size;
#endif
}
}

35
dbms/src/Common/PipeFDs.h Normal file
View File

@ -0,0 +1,35 @@
#pragma once
#include <cstddef>
namespace DB
{
/** Struct containing a pipe with lazy initialization.
* Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access
* pipe's file descriptors.
*/
struct LazyPipeFDs
{
int fds_rw[2] = {-1, -1};
void open();
void close();
void setNonBlocking();
void tryIncreaseSize(int desired_size);
~LazyPipeFDs();
};
/** Struct which opens new pipe on creation and closes it on destruction.
* Use `fds_rw` field to access pipe's file descriptors.
*/
struct PipeFDs : public LazyPipeFDs
{
PipeFDs();
};
}

View File

@ -1,12 +1,12 @@
#include "QueryProfiler.h"
#include <random>
#include <common/Pipe.h>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <Common/StackTrace.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/thread_local_rng.h>
@ -22,7 +22,7 @@ namespace ProfileEvents
namespace DB
{
extern LazyPipe trace_pipe;
extern LazyPipeFDs trace_pipe;
namespace
{

View File

@ -4,11 +4,11 @@
#include <dlfcn.h>
#include <Common/Exception.h>
#include <Common/ShellCommand.h>
#include <Common/PipeFDs.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include <port/unistd.h>
#include <csignal>
#include <common/Pipe.h>
namespace
{
@ -66,9 +66,9 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(const char * filename, c
if (!real_vfork)
throwFromErrno("Cannot find symbol vfork in myself", ErrorCodes::CANNOT_DLSYM);
Pipe pipe_stdin;
Pipe pipe_stdout;
Pipe pipe_stderr;
PipeFDs pipe_stdin;
PipeFDs pipe_stdout;
PipeFDs pipe_stderr;
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();

View File

@ -24,7 +24,7 @@ public:
/// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info
static bool checkPermissions();
#if defined(__linux__)
#if defined(OS_LINUX)
private:
int netlink_socket_fd = -1;
UInt16 taskstats_family_id = 0;

View File

@ -2,7 +2,7 @@
#include <Core/Field.h>
#include <Poco/Logger.h>
#include <common/Pipe.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
@ -19,13 +19,12 @@
namespace DB
{
LazyPipe trace_pipe;
LazyPipeFDs trace_pipe;
namespace ErrorCodes
{
extern const int NULL_POINTER_DEREFERENCE;
extern const int THREAD_IS_NOT_JOINABLE;
extern const int CANNOT_FCNTL;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
@ -40,36 +39,8 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
int flags = fcntl(trace_pipe.fds_rw[1], F_GETFL, 0);
if (-1 == flags)
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
#if defined(OS_LINUX)
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
int pipe_size = fcntl(trace_pipe.fds_rw[1], F_GETPIPE_SZ);
if (-1 == pipe_size)
{
if (errno == EINVAL)
{
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
/// It will work nevertheless.
}
else
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
}
else
{
constexpr int max_pipe_capacity_to_set = 1048576;
for (errno = 0; errno != EPERM && pipe_size < max_pipe_capacity_to_set; pipe_size *= 2)
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
throwFromErrno("Cannot increase pipe capacity to " + toString(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, max_pipe_capacity_to_set)));
}
#endif
trace_pipe.setNonBlocking();
trace_pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}

View File

@ -344,16 +344,8 @@ void DatabaseOrdinary::alterTable(
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
if (ast_create_query.columns_list->indices)
ast_create_query.columns_list->replace(ast_create_query.columns_list->indices, new_indices);
else
ast_create_query.columns_list->set(ast_create_query.columns_list->indices, new_indices);
if (ast_create_query.columns_list->constraints)
ast_create_query.columns_list->replace(ast_create_query.columns_list->constraints, new_constraints);
else
ast_create_query.columns_list->set(ast_create_query.columns_list->constraints, new_constraints);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
if (storage_modifier)
storage_modifier(*ast_create_query.storage);

View File

@ -76,6 +76,9 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {});
/// Processors are not supported here yet.
context.getSettingsRef().experimental_use_processors = false;
/// Query context is needed because some code in executeQuery function may assume it exists.
/// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
context.makeQueryContext();
}
@ -100,6 +103,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, load_all_query{other.load_all_query}
{
context.makeQueryContext();
}
std::string ClickHouseDictionarySource::getUpdateFieldAndDate()

View File

@ -83,6 +83,10 @@ public:
{
abort();
}
else if (mode == "std::terminate")
{
std::terminate();
}
else if (mode == "use after free")
{
int * x_ptr;

View File

@ -46,7 +46,14 @@ BrotliWriteBuffer::BrotliWriteBuffer(WriteBuffer & out_, int compression_level,
BrotliWriteBuffer::~BrotliWriteBuffer()
{
finish();
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void BrotliWriteBuffer::nextImpl()

View File

@ -8,10 +8,6 @@ WriteBufferFromFileBase::WriteBufferFromFileBase(size_t buf_size, char * existin
{
}
WriteBufferFromFileBase::~WriteBufferFromFileBase()
{
}
off_t WriteBufferFromFileBase::seek(off_t off, int whence)
{
return doSeek(off, whence);

View File

@ -13,12 +13,12 @@ class WriteBufferFromFileBase : public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
virtual ~WriteBufferFromFileBase();
~WriteBufferFromFileBase() override = default;
off_t seek(off_t off, int whence = SEEK_SET);
void truncate(off_t length = 0);
virtual off_t getPositionInFile() = 0;
virtual void sync() = 0;
void sync() override = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;

View File

@ -89,8 +89,14 @@ public:
~WriteBufferFromVector() override
{
if (!is_finished)
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
};

View File

@ -133,4 +133,17 @@ void WriteBufferValidUTF8::finish()
putReplacement();
}
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -35,10 +35,7 @@ public:
const char * replacement_ = "\xEF\xBF\xBD",
size_t size = DEFAULT_SIZE);
virtual ~WriteBufferValidUTF8() override
{
finish();
}
~WriteBufferValidUTF8() override;
};
}

View File

@ -98,14 +98,6 @@ NameSet AnalyzedJoin::getQualifiedColumnsSet() const
return out;
}
NameSet AnalyzedJoin::getOriginalColumnsSet() const
{
NameSet out;
for (const auto & names : original_names)
out.insert(names.second);
return out;
}
NamesWithAliases AnalyzedJoin::getNamesWithAliases(const NameSet & required_columns) const
{
NamesWithAliases out;

View File

@ -96,7 +96,6 @@ public:
bool hasOn() const { return table_join.on_expression != nullptr; }
NameSet getQualifiedColumnsSet() const;
NameSet getOriginalColumnsSet() const;
NamesWithAliases getNamesWithAliases(const NameSet & required_columns) const;
NamesWithAliases getRequiredColumns(const Block & sample, const Names & action_columns) const;

View File

@ -170,11 +170,22 @@ size_t CollectJoinOnKeysMatcher::getTableForIdentifiers(std::vector<const ASTIde
if (!membership)
{
const String & name = identifier->name;
bool in_left_table = data.source_columns.count(name);
bool in_right_table = data.joined_columns.count(name);
bool in_left_table = data.left_table.hasColumn(name);
bool in_right_table = data.right_table.hasColumn(name);
if (in_left_table && in_right_table)
throw Exception("Column '" + name + "' is ambiguous", ErrorCodes::AMBIGUOUS_COLUMN_NAME);
{
/// Relax ambiguous check for multiple JOINs
if (auto original_name = IdentifierSemantic::uncover(*identifier))
{
auto match = IdentifierSemantic::canReferColumnToTable(*original_name, data.right_table.table);
if (match == IdentifierSemantic::ColumnMatch::NoMatch)
in_right_table = false;
in_left_table = !in_right_table;
}
else
throw Exception("Column '" + name + "' is ambiguous", ErrorCodes::AMBIGUOUS_COLUMN_NAME);
}
if (in_left_table)
membership = 1;

View File

@ -3,6 +3,7 @@
#include <Core/Names.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/Aliases.h>
@ -25,8 +26,8 @@ public:
struct Data
{
AnalyzedJoin & analyzed_join;
const NameSet & source_columns;
const NameSet & joined_columns;
const TableWithColumnNames & left_table;
const TableWithColumnNames & right_table;
const Aliases & aliases;
const bool is_asof{false};
ASTPtr asof_left_key{};

View File

@ -53,6 +53,20 @@ struct TableWithColumnNames
for (auto & column : addition)
hidden_columns.push_back(column.name);
}
bool hasColumn(const String & name) const
{
if (columns_set.empty())
{
columns_set.insert(columns.begin(), columns.end());
columns_set.insert(hidden_columns.begin(), hidden_columns.end());
}
return columns_set.count(name);
}
private:
mutable NameSet columns_set;
};
std::vector<DatabaseAndTableWithAlias> getDatabaseAndTables(const ASTSelectQuery & select_query, const String & current_database);

View File

@ -92,6 +92,22 @@ std::optional<String> IdentifierSemantic::getTableName(const ASTPtr & ast)
return {};
}
std::optional<ASTIdentifier> IdentifierSemantic::uncover(const ASTIdentifier & identifier)
{
if (identifier.semantic->covered)
{
std::vector<String> name_parts = identifier.name_parts;
return ASTIdentifier(std::move(name_parts));
}
return {};
}
void IdentifierSemantic::coverName(ASTIdentifier & identifier, const String & alias)
{
identifier.setShortName(alias);
identifier.semantic->covered = true;
}
bool IdentifierSemantic::canBeAlias(const ASTIdentifier & identifier)
{
return identifier.semantic->can_be_alias;

View File

@ -12,6 +12,7 @@ struct IdentifierSemanticImpl
{
bool special = false; /// for now it's 'not a column': tables, subselects and some special stuff like FORMAT
bool can_be_alias = true; /// if it's a cropped name it could not be an alias
bool covered = false; /// real (compound) name is hidden by an alias (short name)
std::optional<size_t> membership; /// table position in join
};
@ -43,6 +44,8 @@ struct IdentifierSemantic
static void setColumnLongName(ASTIdentifier & identifier, const DatabaseAndTableWithAlias & db_and_table);
static bool canBeAlias(const ASTIdentifier & identifier);
static void setMembership(ASTIdentifier &, size_t table_no);
static void coverName(ASTIdentifier &, const String & alias);
static std::optional<ASTIdentifier> uncover(const ASTIdentifier & identifier);
static std::optional<size_t> getMembership(const ASTIdentifier & identifier);
static bool chooseTable(const ASTIdentifier &, const std::vector<DatabaseAndTableWithAlias> & tables, size_t & best_table_pos,
bool ambiguous = false);

View File

@ -21,7 +21,6 @@
#include <Parsers/parseQuery.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
@ -36,7 +35,6 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
@ -97,29 +95,20 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
throw Exception("Database " + database_name + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
}
String database_engine_name;
if (!create.storage)
{
database_engine_name = "Ordinary"; /// Default database engine.
auto engine = std::make_shared<ASTFunction>();
engine->name = database_engine_name;
auto storage = std::make_shared<ASTStorage>();
engine->name = "Ordinary";
storage->set(storage->engine, engine);
create.set(create.storage, storage);
}
else
else if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
{
const ASTStorage & storage = *create.storage;
const ASTFunction & engine = *storage.engine;
/// Currently, there are no database engines, that support any arguments.
if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
{
std::stringstream ostr;
formatAST(storage, ostr, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
database_engine_name = engine.name;
std::stringstream ostr;
formatAST(*create.storage, ostr, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
String database_name_escaped = escapeForFileName(database_name);
@ -155,19 +144,27 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
out.close();
}
bool added = false;
bool renamed = false;
try
{
context.addDatabase(database_name, database);
added = true;
if (need_write_metadata)
{
Poco::File(metadata_file_tmp_path).renameTo(metadata_file_path);
renamed = true;
}
database->loadStoredObjects(context, has_force_restore_data_flag);
}
catch (...)
{
if (need_write_metadata)
if (renamed)
Poco::File(metadata_file_tmp_path).remove();
if (added)
context.detachDatabase(database_name);
throw;
}
@ -396,84 +393,97 @@ ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const A
}
ColumnsDescription InterpreterCreateQuery::setProperties(
ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const
InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(ASTCreateQuery & create) const
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
TableProperties properties;
TableStructureReadLockHolder as_storage_lock;
if (create.columns_list)
{
if (create.columns_list->columns)
columns = getColumnsDescription(*create.columns_list->columns, context);
properties.columns = getColumnsDescription(*create.columns_list->columns, context);
if (create.columns_list->indices)
for (const auto & index : create.columns_list->indices->children)
indices.indices.push_back(
properties.indices.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
if (create.columns_list->constraints)
for (const auto & constraint : create.columns_list->constraints->children)
constraints.constraints.push_back(
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
properties.constraints = getConstraintsDescription(create.columns_list->constraints);
}
else if (!create.as_table.empty())
{
columns = as_storage->getColumns();
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
StoragePtr as_storage = context.getTable(as_database_name, create.as_table);
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
properties.columns = as_storage->getColumns();
/// Secondary indices make sense only for MergeTree family of storage engines.
/// We should not copy them for other storages.
if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
indices = as_storage->getIndices();
properties.indices = as_storage->getIndices();
constraints = as_storage->getConstraints();
properties.constraints = as_storage->getConstraints();
}
else if (create.select)
{
columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
Block as_select_sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.select->clone(), context);
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
}
else if (create.as_table_function)
return {};
else
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
/// Even if query has list of columns, canonicalize it (unfold Nested columns).
ASTPtr new_columns = formatColumns(columns);
ASTPtr new_indices = formatIndices(indices);
ASTPtr new_constraints = formatConstraints(constraints);
if (!create.columns_list)
{
auto new_columns_list = std::make_shared<ASTColumns>();
create.set(create.columns_list, new_columns_list);
}
create.set(create.columns_list, std::make_shared<ASTColumns>());
if (create.columns_list->columns)
create.columns_list->replace(create.columns_list->columns, new_columns);
else
create.columns_list->set(create.columns_list->columns, new_columns);
ASTPtr new_columns = formatColumns(properties.columns);
ASTPtr new_indices = formatIndices(properties.indices);
ASTPtr new_constraints = formatConstraints(properties.constraints);
if (new_indices && create.columns_list->indices)
create.columns_list->replace(create.columns_list->indices, new_indices);
else if (new_indices)
create.columns_list->set(create.columns_list->indices, new_indices);
create.columns_list->setOrReplace(create.columns_list->columns, new_columns);
create.columns_list->setOrReplace(create.columns_list->indices, new_indices);
create.columns_list->setOrReplace(create.columns_list->constraints, new_constraints);
if (new_constraints && create.columns_list->constraints)
create.columns_list->replace(create.columns_list->constraints, new_constraints);
else if (new_constraints)
create.columns_list->set(create.columns_list->constraints, new_constraints);
validateTableStructure(create, properties);
/// Set the table engine if it was not specified explicitly.
setEngine(create);
return properties;
}
void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties) const
{
/// Check for duplicates
std::set<String> all_columns;
for (const auto & column : columns)
for (const auto & column : properties.columns)
{
if (!all_columns.emplace(column.name).second)
throw Exception("Column " + backQuoteIfNeed(column.name) + " already exists", ErrorCodes::DUPLICATE_COLUMN);
}
return columns;
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
{
for (const auto & name_and_type_pair : properties.columns.getAllPhysical())
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default "
"due to expected negative impact on performance. "
"It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.storage)
@ -535,23 +545,19 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
throw Exception("Temporary tables cannot be inside a database. You should not specify a database for a temporary table.",
ErrorCodes::BAD_DATABASE_FOR_TEMPORARY_TABLE);
String path = context.getPath();
String current_database = context.getCurrentDatabase();
String database_name = create.database.empty() ? current_database : create.database;
String table_name = create.table;
String table_name_escaped = escapeForFileName(table_name);
// If this is a stub ATTACH query, read the query definition from the database
if (create.attach && !create.storage && !create.columns_list)
{
// Table SQL definition is available even if the table is detached
auto query = context.getCreateTableQuery(database_name, table_name);
auto query = context.getCreateTableQuery(create.database, create.table);
create = query->as<ASTCreateQuery &>(); // Copy the saved create query, but use ATTACH instead of CREATE
create.attach = true;
}
if (create.to_database.empty())
String current_database = context.getCurrentDatabase();
if (!create.temporary && create.database.empty())
create.database = current_database;
if (!create.to_table.empty() && create.to_database.empty())
create.to_database = current_database;
if (create.select && (create.is_view || create.is_materialized_view || create.is_live_view))
@ -560,26 +566,63 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
visitor.visit(*create.select);
}
Block as_select_sample;
if (create.select && (!create.attach || !create.columns_list))
as_select_sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.select->clone(), context);
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = setProperties(create);
String as_database_name = create.as_database.empty() ? current_database : create.as_database;
String as_table_name = create.as_table;
/// Actually creates table
bool created = doCreateTable(create, properties);
StoragePtr as_storage;
TableStructureReadLockHolder as_storage_lock;
if (!created) /// Table already exists
return {};
if (!as_table_name.empty())
return fillTableIfNeeded(create);
}
bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties)
{
std::unique_ptr<DDLGuard> guard;
String data_path;
DatabasePtr database;
const String & table_name = create.table;
bool need_add_to_database = !create.temporary || create.is_live_view;
if (need_add_to_database)
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
database = context.getDatabase(create.database);
data_path = database->getDataPath();
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuard(create.database, table_name);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
/// TODO Check structure of table
if (create.if_not_exists)
return false;
else if (create.replace_view)
{
/// when executing CREATE OR REPLACE VIEW, drop current existing view
auto drop_ast = std::make_shared<ASTDropQuery>();
drop_ast->database = create.database;
drop_ast->table = table_name;
drop_ast->no_ddl_lock = true;
InterpreterDropQuery interpreter(drop_ast, context);
interpreter.execute();
}
else
throw Exception("Table " + create.database + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return false;
ColumnsDescription columns;
ConstraintsDescription constraints;
StoragePtr res;
if (create.as_table_function)
{
const auto & table_function = create.as_table_function->as<ASTFunction &>();
@ -588,99 +631,38 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
}
else
{
/// Set and retrieve list of columns.
columns = setProperties(create, as_select_sample, as_storage);
constraints = getConstraintsDescription(create.columns_list->constraints);
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
{
for (const auto & name_and_type_pair : columns.getAllPhysical())
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default due to expected negative impact on performance. It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
/// Set the table engine if it was not specified explicitly.
setEngine(create);
res = StorageFactory::instance().get(create,
data_path,
table_name,
create.database,
context,
context.getGlobalContext(),
properties.columns,
properties.constraints,
create.attach,
false);
}
{
std::unique_ptr<DDLGuard> guard;
if (need_add_to_database)
database->createTable(context, table_name, res, query_ptr);
else
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
String data_path;
DatabasePtr database;
/// We must call "startup" and "shutdown" while holding DDLGuard.
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
/// (in case when table was created and instantly dropped before started up)
///
/// Method "startup" may create background tasks and method "shutdown" will wait for them.
/// But if "shutdown" is called before "startup", it will exit early, because there are no background tasks to wait.
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
/// and the task will use references to freed data.
if (!create.temporary || create.is_live_view)
{
database = context.getDatabase(database_name);
data_path = database->getDataPath();
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuard(database_name, table_name);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
/// TODO Check structure of table
if (create.if_not_exists)
return {};
else if (create.replace_view)
{
/// when executing CREATE OR REPLACE VIEW, drop current existing view
auto drop_ast = std::make_shared<ASTDropQuery>();
drop_ast->database = database_name;
drop_ast->table = table_name;
drop_ast->no_ddl_lock = true;
InterpreterDropQuery interpreter(drop_ast, context);
interpreter.execute();
}
else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};
if (!create.as_table_function)
{
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
constraints,
create.attach,
false);
}
if (create.temporary && !create.is_live_view)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
else
database->createTable(context, table_name, res, query_ptr);
/// We must call "startup" and "shutdown" while holding DDLGuard.
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
/// (in case when table was created and instantly dropped before started up)
///
/// Method "startup" may create background tasks and method "shutdown" will wait for them.
/// But if "shutdown" is called before "startup", it will exit early, because there are no background tasks to wait.
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
/// and the task will use references to freed data.
res->startup();
}
res->startup();
return true;
}
BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
{
/// If the query is a CREATE SELECT, insert the data into the table.
if (create.select && !create.attach
&& !create.is_view && !create.is_live_view && (!create.is_materialized_view || create.is_populate))
@ -688,9 +670,9 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
auto insert = std::make_shared<ASTInsertQuery>();
if (!create.temporary)
insert->database = database_name;
insert->database = create.database;
insert->table = table_name;
insert->table = create.table;
insert->select = create.select->clone();
if (create.temporary && !context.getSessionContext().hasQueryContext())

View File

@ -49,15 +49,28 @@ public:
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
private:
struct TableProperties
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
};
BlockIO createDatabase(ASTCreateQuery & create);
BlockIO createTable(ASTCreateQuery & create);
BlockIO createDictionary(ASTCreateQuery & create);
/// Calculate list of columns, constraints, indices, etc... of table and return columns.
ColumnsDescription setProperties(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties setProperties(ASTCreateQuery & create) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
void setEngine(ASTCreateQuery & create) const;
void checkAccess(const ASTCreateQuery & create);
/// Create IStorage and add it to database. If table already exists and IF NOT EXISTS specified, do nothing and return false.
bool doCreateTable(const ASTCreateQuery & create, const TableProperties & properties);
/// Inserts data in created table if it's CREATE ... SELECT
BlockIO fillTableIfNeeded(const ASTCreateQuery & create);
ASTPtr query_ptr;
Context & context;

View File

@ -159,7 +159,7 @@ struct ColumnAliasesMatcher
aliases[alias] = long_name;
rev_aliases[long_name].push_back(alias);
identifier->setShortName(alias);
IdentifierSemantic::coverName(*identifier, alias);
if (is_public)
{
identifier->setAlias(long_name);
@ -177,7 +177,7 @@ struct ColumnAliasesMatcher
if (is_public && allowed_long_names.count(long_name))
; /// leave original name unchanged for correct output
else
identifier->setShortName(it->second[0]);
IdentifierSemantic::coverName(*identifier, it->second[0]);
}
}
}
@ -229,7 +229,7 @@ struct ColumnAliasesMatcher
if (!last_table)
{
node.setShortName(alias);
IdentifierSemantic::coverName(node, alias);
node.setAlias("");
}
}

View File

@ -532,8 +532,8 @@ void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_defaul
}
/// Find the columns that are obtained by JOIN.
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query, const NameSet & source_columns,
const Aliases & aliases)
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query,
const std::vector<TableWithColumnNames> & tables, const Aliases & aliases)
{
const ASTTablesInSelectQueryElement * node = select_query.join();
if (!node)
@ -551,7 +551,7 @@ void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & s
{
bool is_asof = (table_join.strictness == ASTTableJoin::Strictness::Asof);
CollectJoinOnKeysVisitor::Data data{analyzed_join, source_columns, analyzed_join.getOriginalColumnsSet(), aliases, is_asof};
CollectJoinOnKeysVisitor::Data data{analyzed_join, tables[0], tables[1], aliases, is_asof};
CollectJoinOnKeysVisitor(data).visit(table_join.on_expression);
if (!data.has_some)
throw Exception("Cannot get JOIN keys from JOIN ON section: " + queryToString(table_join.on_expression),
@ -820,6 +820,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
if (storage)
collectSourceColumns(storage->getColumns(), result.source_columns, (select_query != nullptr));
NameSet source_columns_set = removeDuplicateColumns(result.source_columns);
std::vector<TableWithColumnNames> tables_with_columns;
if (select_query)
{
@ -837,7 +838,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
}
std::vector<const ASTTableExpression *> table_expressions = getTableExpressions(*select_query);
auto tables_with_columns = getTablesWithColumns(table_expressions, context);
tables_with_columns = getTablesWithColumns(table_expressions, context);
if (tables_with_columns.empty())
{
@ -935,7 +936,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
setJoinStrictness(*select_query, settings.join_default_strictness, settings.any_join_distinct_right_table_keys,
result.analyzed_join->table_join);
collectJoinedColumns(*result.analyzed_join, *select_query, source_columns_set, result.aliases);
collectJoinedColumns(*result.analyzed_join, *select_query, tables_with_columns, result.aliases);
}
result.aggregates = getAggregates(query);

View File

@ -197,6 +197,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below.
memory_tracker.setParent(nullptr);
query_id.clear();
query_context = nullptr;
thread_group.reset();

View File

@ -146,6 +146,15 @@ public:
throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR);
}
template <typename T>
void setOrReplace(T * & field, const ASTPtr & child)
{
if (field)
replace(field, child);
else
set(field, child);
}
/// Convert to a string.
/// Format settings.

View File

@ -176,7 +176,12 @@ public:
/// Check can output.
if (output.isFinished())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
if (!output.canPush())
return Status::PortFull;

View File

@ -134,7 +134,10 @@ IProcessor::Status MergingSortedTransform::prepare()
auto chunk = input.pull();
if (!chunk.hasRows())
{
all_inputs_has_data = false;
if (!input.isFinished())
all_inputs_has_data = false;
continue;
}
@ -176,13 +179,14 @@ IProcessor::Status MergingSortedTransform::prepare()
return Status::NeedData;
auto chunk = input.pull();
if (!chunk.hasRows())
if (!chunk.hasRows() && !input.isFinished())
return Status::NeedData;
updateCursor(std::move(chunk), next_input_to_read);
pushToQueue(next_input_to_read);
need_data = false;
}
need_data = false;
}
return Status::Ready;

View File

@ -80,11 +80,15 @@ namespace
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, const std::string & name_, const ConnectionPoolPtr & pool_, ActionBlocker & monitor_blocker_)
StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
/// It's important to initialize members before `thread` to avoid race.
: storage(storage_)
, pool{pool_}
, name{name_}
, pool(std::move(pool_))
, name(std::move(name_))
, path{storage.path + name + '/'}
, should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts)
, min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
@ -92,10 +96,6 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, log{&Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker_)
{
const Settings & settings = storage.global_context.getSettingsRef();
should_batch_inserts = settings.distributed_directory_monitor_batch_inserts;
min_batched_block_size_rows = settings.min_insert_block_size_rows;
min_batched_block_size_bytes = settings.min_insert_block_size_bytes;
}

View File

@ -20,7 +20,7 @@ class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, const std::string & name_, const ConnectionPoolPtr & pool_, ActionBlocker & monitor_blocker_);
StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);
~StorageDistributedDirectoryMonitor();
@ -44,22 +44,22 @@ private:
std::string getLoggerName() const;
StorageDistributed & storage;
ConnectionPoolPtr pool;
std::string name;
const ConnectionPoolPtr pool;
const std::string name;
std::string path;
bool should_batch_inserts = false;
size_t min_batched_block_size_rows = 0;
size_t min_batched_block_size_bytes = 0;
const bool should_batch_inserts = false;
const size_t min_batched_block_size_rows = 0;
const size_t min_batched_block_size_bytes = 0;
String current_batch_file_path;
struct BatchHeader;
struct Batch;
size_t error_count{};
std::chrono::milliseconds default_sleep_time;
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;
std::chrono::milliseconds max_sleep_time;
const std::chrono::milliseconds max_sleep_time;
std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()};
std::atomic<bool> quit {false};
std::mutex mutex;

View File

@ -39,7 +39,7 @@ void StorageFactory::registerStorage(const std::string & name, Creator creator)
StoragePtr StorageFactory::get(
ASTCreateQuery & query,
const ASTCreateQuery & query,
const String & data_path,
const String & table_name,
const String & database_name,

View File

@ -47,7 +47,7 @@ public:
using Creator = std::function<StoragePtr(const Arguments & arguments)>;
StoragePtr get(
ASTCreateQuery & query,
const ASTCreateQuery & query,
const String & data_path,
const String & table_name,
const String & database_name,

View File

@ -0,0 +1,219 @@
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#include <signal.h>
#include <poll.h>
#include <mutex>
#include <filesystem>
#include <ext/scope_guard.h>
#include <Storages/System/StorageSystemStackTrace.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <Common/PipeFDs.h>
#include <common/getThreadNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SIGQUEUE;
extern const int CANNOT_MANIPULATE_SIGSET;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int LOGICAL_ERROR;
}
namespace
{
const pid_t expected_pid = getpid();
const int sig = SIGRTMIN;
int sequence_num = 0; /// For messages sent via pipe.
UInt32 thread_number{0};
std::optional<StackTrace> stack_trace;
static constexpr size_t max_query_id_size = 128;
char query_id_data[max_query_id_size];
size_t query_id_size = 0;
LazyPipeFDs notification_pipe;
void signalHandler(int, siginfo_t * info, void * context)
{
/// In case malicious user is sending signals manually (for unknown reason).
/// If we don't check - it may break our synchronization.
if (info->si_pid != expected_pid)
return;
/// Signal received too late.
if (info->si_value.sival_int != sequence_num)
return;
/// All these methods are signal-safe.
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
stack_trace.emplace(signal_context);
thread_number = getThreadNumber();
StringRef query_id = CurrentThread::getQueryId();
query_id_size = std::min(query_id.size, max_query_id_size);
memcpy(query_id_data, query_id.data, query_id_size);
int notification_num = info->si_value.sival_int;
ssize_t res = ::write(notification_pipe.fds_rw[1], &notification_num, sizeof(notification_num));
/// We cannot do anything if write failed.
(void)res;
}
/// Wait for data in pipe and read it.
bool wait(int timeout_ms)
{
while (true)
{
int fd = notification_pipe.fds_rw[0];
pollfd poll_fd{fd, POLLIN, 0};
int poll_res = poll(&poll_fd, 1, timeout_ms);
if (poll_res < 0)
{
if (errno == EINTR)
{
--timeout_ms; /// Quite a hacky way to update timeout. Just to make sure we avoid infinite waiting.
if (timeout_ms == 0)
return false;
continue;
}
throwFromErrno("Cannot poll pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (poll_res == 0)
return false;
int notification_num = 0;
ssize_t read_res = ::read(fd, &notification_num, sizeof(notification_num));
if (read_res < 0)
{
if (errno == EINTR)
continue;
throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (read_res == sizeof(notification_num))
{
if (notification_num == sequence_num)
return true;
else
continue; /// Drain delayed notifications.
}
throw Exception("Logical error: read wrong number of bytes from pipe", ErrorCodes::LOGICAL_ERROR);
}
}
}
StorageSystemStackTrace::StorageSystemStackTrace(const String & name_)
: IStorageSystemOneBlock<StorageSystemStackTrace>(name_)
{
notification_pipe.open();
/// Setup signal handler.
struct sigaction sa{};
sa.sa_sigaction = signalHandler;
sa.sa_flags = SA_SIGINFO;
if (sigemptyset(&sa.sa_mask))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaddset(&sa.sa_mask, sig))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaction(sig, &sa, nullptr))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
}
NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
{
return
{
{ "thread_number", std::make_shared<DataTypeUInt32>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) }
};
}
void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
/// It shouldn't be possible to do concurrent reads from this table.
std::lock_guard lock(mutex);
/// Send a signal to every thread and wait for result.
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
/// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals).
/// Obviously, results for different threads may be out of sync.
/// There is no better way to enumerate threads in a process other than looking into procfs.
std::filesystem::directory_iterator end;
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
{
pid_t tid = parse<pid_t>(it->path().filename());
sigval sig_value{};
sig_value.sival_int = sequence_num;
if (0 != ::sigqueue(tid, sig, sig_value))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(100))
{
size_t stack_trace_size = stack_trace->getSize();
size_t stack_trace_offset = stack_trace->getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace->getFrames()[i]));
res_columns[0]->insert(thread_number);
res_columns[1]->insertData(query_id_data, query_id_size);
res_columns[2]->insert(arr);
}
else
{
/// Cannot obtain a stack trace. But create a record in result nevertheless.
res_columns[0]->insert(tid); /// TODO Replace all thread numbers to OS thread numbers.
res_columns[1]->insertDefault();
res_columns[2]->insertDefault();
}
sequence_num = static_cast<int>(static_cast<unsigned>(sequence_num) + 1);
}
}
}
#endif

View File

@ -0,0 +1,37 @@
#pragma once
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#include <mutex>
#include <ext/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
/// Allows to introspect stack trace of all server threads.
/// It acts like an embedded debugger.
/// More than one instance of this table cannot be used.
class StorageSystemStackTrace : public ext::shared_ptr_helper<StorageSystemStackTrace>, public IStorageSystemOneBlock<StorageSystemStackTrace>
{
friend struct ext::shared_ptr_helper<StorageSystemStackTrace>;
public:
String getName() const override { return "SystemStackTrace"; }
static NamesAndTypesList getNamesAndTypes();
StorageSystemStackTrace(const String & name_);
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
mutable std::mutex mutex;
};
}
#endif

View File

@ -39,6 +39,10 @@
#include <Storages/System/StorageSystemDisks.h>
#include <Storages/System/StorageSystemStoragePolicies.h>
#ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h>
#endif
namespace DB
{
@ -65,6 +69,9 @@ void attachSystemTablesLocal(IDatabase & system_database)
system_database.attachTable("collations", StorageSystemCollations::create("collations"));
system_database.attachTable("table_engines", StorageSystemTableEngines::create("table_engines"));
system_database.attachTable("contributors", StorageSystemContributors::create("contributors"));
#ifdef OS_LINUX
system_database.attachTable("stack_trace", StorageSystemStackTrace::create("stack_trace"));
#endif
}
void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)

View File

@ -16,7 +16,17 @@ ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutat
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync = 1" &
sleep 0.1
check_query1="SELECT substr(latest_fail_reason, 1, 8) as ErrorCode FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND ErrorCode != ''"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
while [ -z "$query_result" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
sleep 0.1
done
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_1_1_0', '20010101_2_2_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation'"
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation'"
@ -31,9 +41,19 @@ ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1 SETTINGS mutations_sync = 1" &
check_query2="SELECT substr(latest_fail_reason, 1, 8) as ErrorCode FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt' AND ErrorCode != ''"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
while [ -z "$query_result" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
sleep 0.1
done
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_1_1_0', '20010101_2_2_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'"
sleep 0.1
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'"
wait

View File

@ -34,7 +34,7 @@ $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_to_kill' ASYNC" &>/dev/nul
sleep 1
# Kill $query_for_pending SYNC. This query is not blocker, so it should be killed fast.
timeout 5 $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending' SYNC" &>/dev/null
timeout 10 $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending' SYNC" &>/dev/null
# Both queries have to be killed, doesn't matter with SYNC or ASYNC kill
for run in {1..15}

View File

@ -0,0 +1,16 @@
drop table if exists default.test_01051_d;
drop table if exists default.test_view_01051_d;
drop dictionary if exists default.test_dict_01051_d;
create table default.test_01051_d (key UInt64, value String) engine = MergeTree order by key;
create view default.test_view_01051_d (key UInt64, value String) as select k2 + 1 as key, v2 || '_x' as value from (select key + 2 as k2, value || '_y' as v2 from default.test_01051_d);
insert into default.test_01051_d values (1, 'a');
create dictionary default.test_dict_01051_d (key UInt64, value String) primary key key source(clickhouse(host 'localhost' port '9000' user 'default' password '' db 'default' table 'test_view_01051_d')) layout(flat()) lifetime(100500);
select dictGet('default.test_dict_01051_d', 'value', toUInt64(4));
drop table if exists default.test_01051_d;
drop table if exists default.test_view_01051_d;
drop dictionary if exists default.test_dict_01051_d;

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS a;
DROP TABLE IF EXISTS b;
DROP TABLE IF EXISTS c;
CREATE TABLE a (x UInt64) ENGINE = Memory;
CREATE TABLE b (x UInt64) ENGINE = Memory;
CREATE TABLE c (x UInt64) ENGINE = Memory;
SET enable_optimize_predicate_expression = 0;
SELECT a.x AS x FROM a
LEFT JOIN b ON a.x = b.x
LEFT JOIN c ON a.x = c.x;
SELECT a.x AS x FROM a
LEFT JOIN b ON a.x = b.x
LEFT JOIN c ON b.x = c.x;
SELECT b.x AS x FROM a
LEFT JOIN b ON a.x = b.x
LEFT JOIN c ON b.x = c.x;
SELECT c.x AS x FROM a
LEFT JOIN b ON a.x = b.x
LEFT JOIN c ON b.x = c.x;
DROP TABLE a;
DROP TABLE b;
DROP TABLE c;

View File

@ -11,7 +11,7 @@ install_packages() {
}
download_data() {
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets"
clickhouse-client --query "ATTACH DATABASE IF NOT EXISTS datasets ENGINE = Ordinary"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS test"
/s3downloader --dataset-names $OPEN_DATASETS
/s3downloader --dataset-names $PRIVATE_DATASETS --url 'https://s3.mds.yandex.net/clickhouse-private-datasets'

View File

@ -39,7 +39,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
&& /s3downloader --dataset-names $DATASETS \
&& chmod 777 -R /var/lib/clickhouse \
&& clickhouse-client --query "SHOW DATABASES" \
&& clickhouse-client --query "CREATE DATABASE datasets" \
&& clickhouse-client --query "ATTACH DATABASE datasets ENGINE = Ordinary" \
&& clickhouse-client --query "CREATE DATABASE test" \
&& service clickhouse-server restart && sleep 5 \
&& clickhouse-client --query "SHOW TABLES FROM datasets" \

View File

@ -81,7 +81,7 @@ while /bin/true; do
done &
LLVM_PROFILE_FILE='client_%h_%p_%m.profraw' clickhouse-client --query "SHOW DATABASES"
LLVM_PROFILE_FILE='client_%h_%p_%m.profraw' clickhouse-client --query "CREATE DATABASE datasets"
LLVM_PROFILE_FILE='client_%h_%p_%m.profraw' clickhouse-client --query "ATTACH DATABASE datasets ENGINE = Ordinary"
LLVM_PROFILE_FILE='client_%h_%p_%m.profraw' clickhouse-client --query "CREATE DATABASE test"
kill_clickhouse

View File

@ -39,7 +39,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
service clickhouse-server start && sleep 5 \
&& /s3downloader --dataset-names $DATASETS \
&& chmod 777 -R /var/lib/clickhouse \
&& clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets" \
&& clickhouse-client --query "ATTACH DATABASE IF NOT EXISTS datasets ENGINE = Ordinary" \
&& clickhouse-client --query "CREATE DATABASE IF NOT EXISTS test" \
&& service clickhouse-server restart && sleep 5 \
&& clickhouse-client --query "SHOW TABLES FROM datasets" \

View File

@ -3,7 +3,7 @@
The HTTP interface lets you use ClickHouse on any platform from any programming language. We use it for working from Java and Perl, as well as shell scripts. In other departments, the HTTP interface is used from Perl, Python, and Go. The HTTP interface is more limited than the native interface, but it has better compatibility.
By default, clickhouse-server listens for HTTP on port 8123 (this can be changed in the config).
If you make a GET / request without parameters, it returns the string "Ok" (with a line feed at the end). You can use this in health-check scripts.
If you make a GET / request without parameters, it returns the string "Ok." (with a line feed at the end). You can use this in health-check scripts.
```bash
$ curl 'http://localhost:8123/'

View File

@ -2,9 +2,9 @@
# HTTP interface
HTTP interface به شما امکان استفاده از ClickHpuse در هر پلتفرم با هر زمان برنامه نویسی را می دهد. ما از این Interface برای زبان های Java و Perl به مانند shell استفاده می کنیم. در دیگر دپارتمان ها، HTTP interface در Perl، Python، و Go استفاده می شود. HTTP Interface محدود تر از native interface می باشد، اما سازگاری بهتری دارد.
HTTP interface به شما امکان استفاده از ClickHouse در هر پلتفرم با هر زمان برنامه نویسی را می دهد. ما از این Interface برای زبان های Java و Perl به مانند shell استفاده می کنیم. در دیگر دپارتمان ها، HTTP interface در Perl، Python، و Go استفاده می شود. HTTP Interface محدود تر از native interface می باشد، اما سازگاری بهتری دارد.
به صورت پیش فرض، clickhouse-server به پرت 8123 در HTTP گوش می دهد. (میتونه در کانفیگ فایل تغییر پیدا کنه). اگر شما یک درخواست GET / بدون پارامتر بسازید، رشته ی "OK" رو دریافت می کنید (به همراه line feed در انتها). شما می توانید از این درخواست برای اسکریپت های health-check استفاده کنید.
به صورت پیش فرض، clickhouse-server به پرت 8123 در HTTP گوش می دهد. (میتونه در کانفیگ فایل تغییر پیدا کنه). اگر شما یک درخواست GET / بدون پارامتر بسازید، رشته ی "Ok." رو دریافت می کنید (به همراه line feed در انتها). شما می توانید از این درخواست برای اسکریپت های health-check استفاده کنید.
</div>

View File

@ -3,7 +3,7 @@
HTTP 接口可以让你通过任何平台和编程语言来使用 ClickHouse。我们用 Java 和 Perl 以及 shell 脚本来访问它。在其他的部门中HTTP 接口会用在 PerlPython 以及 Go 中。HTTP 接口比 TCP 原生接口更为局限,但是却有更好的兼容性。
默认情况下clickhouse-server 会在端口 8123 上监控 HTTP 请求(这可以在配置中修改)。
如果你发送了一个不带参数的 GET 请求,它会返回一个字符串 "Ok"(结尾有换行)。可以将它用在健康检查脚本中。
如果你发送了一个不带参数的 GET 请求,它会返回一个字符串 "Ok."(结尾有换行)。可以将它用在健康检查脚本中。
```bash
$ curl 'http://localhost:8123/'

View File

@ -23,7 +23,6 @@ add_library (common
src/getThreadNumber.cpp
src/sleep.cpp
src/argsToConfig.cpp
src/Pipe.cpp
src/phdr_cache.cpp
src/coverage.cpp
@ -47,7 +46,6 @@ add_library (common
include/common/setTerminalEcho.h
include/common/find_symbols.h
include/common/constexpr_helpers.h
include/common/Pipe.h
include/common/getThreadNumber.h
include/common/sleep.h
include/common/SimpleCache.h

View File

@ -1,34 +0,0 @@
#pragma once
#include <unistd.h>
#include <fcntl.h>
#include <stdexcept>
/**
* Struct containing a pipe with lazy initialization.
* Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access
* pipe's file descriptors.
*/
struct LazyPipe
{
int fds_rw[2] = {-1, -1};
LazyPipe() = default;
void open();
void close();
virtual ~LazyPipe() = default;
};
/**
* Struct which opens new pipe on creation and closes it on destruction.
* Use `fds_rw` field to access pipe's file descriptors.
*/
struct Pipe : public LazyPipe
{
Pipe();
~Pipe();
};

View File

@ -1,45 +0,0 @@
#include "common/Pipe.h"
void LazyPipe::open()
{
for (int & fd : fds_rw)
{
if (fd >= 0)
{
throw std::logic_error("Pipe is already opened");
}
}
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
throw std::runtime_error("Cannot create pipe");
#else
if (0 != pipe(fds_rw))
throw std::runtime_error("Cannot create pipe");
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
throw std::runtime_error("Cannot setup auto-close on exec for read end of pipe");
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
throw std::runtime_error("Cannot setup auto-close on exec for write end of pipe");
#endif
}
void LazyPipe::close()
{
for (int fd : fds_rw)
{
if (fd >= 0)
{
::close(fd);
}
}
}
Pipe::Pipe()
{
open();
}
Pipe::~Pipe()
{
close();
}

View File

@ -19,10 +19,12 @@
#include <Poco/Version.h>
#include <common/Types.h>
#include <common/logger_useful.h>
#include <common/getThreadNumber.h>
#include <daemon/GraphiteWriter.h>
#include <Common/Config/ConfigProcessor.h>
#include <loggers/Loggers.h>
namespace Poco { class TaskManager; }

View File

@ -1,5 +1,5 @@
#include <daemon/BaseDaemon.h>
#include <Common/Config/ConfigProcessor.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
@ -12,20 +12,15 @@
#include <unistd.h>
#include <typeinfo>
#include <common/logger_useful.h>
#include <common/ErrorHandlers.h>
#include <common/Pipe.h>
#include <Common/StackTrace.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <memory>
#include <Poco/Observer.h>
#include <Poco/AutoPtr.h>
#include <common/getThreadNumber.h>
#include <common/coverage.h>
#include <Poco/PatternFormatter.h>
#include <Poco/TaskManager.h>
#include <Poco/File.h>
@ -37,16 +32,25 @@
#include <Poco/Condition.h>
#include <Poco/SyslogChannel.h>
#include <Poco/DirectoryIterator.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <common/ErrorHandlers.h>
#include <common/argsToConfig.h>
#include <common/getThreadNumber.h>
#include <common/coverage.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/Exception.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/config_version.h>
#include <common/argsToConfig.h>
#ifdef __APPLE__
// ucontext is not available without _XOPEN_SOURCE
@ -55,7 +59,7 @@
#include <ucontext.h>
Pipe signal_pipe;
DB::PipeFDs signal_pipe;
/** Reset signal handler to the default and send signal to itself.
@ -68,8 +72,16 @@ static void call_default_signal_handler(int sig)
}
using ThreadNumber = decltype(getThreadNumber());
static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(StackTrace) + sizeof(ThreadNumber);
static constexpr size_t max_query_id_size = 127;
static const size_t buf_size =
sizeof(int)
+ sizeof(siginfo_t)
+ sizeof(ucontext_t)
+ sizeof(StackTrace)
+ sizeof(UInt32)
+ max_query_id_size + 1; /// query_id + varint encoded length
using signal_function = void(int, siginfo_t*, void*);
@ -93,9 +105,9 @@ static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * co
}
/** Handler for "fault" signals. Send data about fault to separate thread to write into log.
/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log.
*/
static void faultSignalHandler(int sig, siginfo_t * info, void * context)
static void signalHandler(int sig, siginfo_t * info, void * context)
{
char buf[buf_size];
DB::WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], buf_size, buf);
@ -103,11 +115,15 @@ static void faultSignalHandler(int sig, siginfo_t * info, void * context)
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
StringRef query_id = CurrentThread::getQueryId(); /// This is signal safe.
query_id.size = std::min(query_id.size, max_query_id_size);
DB::writeBinary(sig, out);
DB::writePODBinary(*info, out);
DB::writePODBinary(signal_context, out);
DB::writePODBinary(stack_trace, out);
DB::writeBinary(getThreadNumber(), out);
DB::writeBinary(UInt32(getThreadNumber()), out);
DB::writeStringBinary(query_id, out);
out.next();
@ -163,7 +179,7 @@ public:
}
else if (sig == Signals::StdTerminate)
{
ThreadNumber thread_num;
UInt32 thread_num;
std::string message;
DB::readBinary(thread_num, in);
@ -182,16 +198,18 @@ public:
siginfo_t info;
ucontext_t context;
StackTrace stack_trace(NoCapture{});
ThreadNumber thread_num;
UInt32 thread_num;
std::string query_id;
DB::readPODBinary(info, in);
DB::readPODBinary(context, in);
DB::readPODBinary(stack_trace, in);
DB::readBinary(thread_num, in);
DB::readBinary(query_id, in);
/// This allows to receive more signals if failure happens inside onFault function.
/// Example: segfault while symbolizing stack trace.
std::thread([=] { onFault(sig, info, context, stack_trace, thread_num); }).detach();
std::thread([=] { onFault(sig, info, context, stack_trace, thread_num, query_id); }).detach();
}
}
}
@ -201,16 +219,33 @@ private:
BaseDaemon & daemon;
private:
void onTerminate(const std::string & message, ThreadNumber thread_num) const
void onTerminate(const std::string & message, UInt32 thread_num) const
{
LOG_FATAL(log, "(version " << VERSION_STRING << VERSION_OFFICIAL << ") (from thread " << thread_num << ") " << message);
}
void onFault(int sig, const siginfo_t & info, const ucontext_t & context, const StackTrace & stack_trace, ThreadNumber thread_num) const
void onFault(
int sig,
const siginfo_t & info,
const ucontext_t & context,
const StackTrace & stack_trace,
UInt32 thread_num,
const std::string & query_id) const
{
LOG_FATAL(log, "########################################");
LOG_FATAL(log, "(version " << VERSION_STRING << VERSION_OFFICIAL << ") (from thread " << thread_num << ") "
<< "Received signal " << strsignal(sig) << " (" << sig << ")" << ".");
{
std::stringstream message;
message << "(version " << VERSION_STRING << VERSION_OFFICIAL << ")";
message << " (from thread " << thread_num << ")";
if (query_id.empty())
message << " (no query)";
else
message << " (query_id: " << query_id << ")";
message << " Received signal " << strsignal(sig) << " (" << sig << ")" << ".";
LOG_FATAL(log, message.rdbuf());
}
LOG_FATAL(log, signalToErrorMessage(sig, info, context));
@ -265,7 +300,7 @@ static void terminate_handler()
DB::WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf);
DB::writeBinary(static_cast<int>(SignalListener::StdTerminate), out);
DB::writeBinary(getThreadNumber(), out);
DB::writeBinary(UInt32(getThreadNumber()), out);
DB::writeBinary(log_message, out);
out.next();
@ -723,7 +758,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
/// SIGTSTP is added for debugging purposes. To output a stack trace of any running thread at anytime.
add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP}, faultSignalHandler);
add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP}, signalHandler);
add_signal_handler({SIGHUP, SIGUSR1}, closeLogsSignalHandler);
add_signal_handler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler);
@ -731,6 +766,9 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
static KillingErrorHandler killing_error_handler;
Poco::ErrorHandler::set(&killing_error_handler);
signal_pipe.setNonBlocking();
signal_pipe.tryIncreaseSize(1 << 20);
signal_listener.reset(new SignalListener(*this));
signal_listener_thread.start(*signal_listener);
}

View File

@ -27,10 +27,6 @@ if (GLIBC_COMPATIBILITY)
list(APPEND glibc_compatibility_sources musl/getentropy.c)
endif()
if(MAKE_STATIC_LIBRARIES)
list(APPEND glibc_compatibility_sources libcxxabi/cxa_thread_atexit.cpp)
endif()
# Need to omit frame pointers to match the performance of glibc
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fomit-frame-pointer")

Some files were not shown because too many files have changed in this diff Show More