Merge branch 'master' of github.com:ClickHouse/ClickHouse

This commit is contained in:
Sergei Shtykov 2019-12-25 13:42:45 +03:00
commit bd96252c74
128 changed files with 1301 additions and 774 deletions

2
contrib/libcxxabi vendored

@ -1 +1 @@
Subproject commit c26cf36f8387c5edf2cabb4a630f0975c35aa9fb
Subproject commit 7aacd45028ecf5f1c39985ecbd4f67eed9b11ce5

View File

@ -84,8 +84,6 @@ public:
{
this->data(place).result.insertResultInto(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -130,8 +130,6 @@ public:
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -87,8 +87,6 @@ public:
column.getData().push_back(this->data(place).template result<ResultType>());
}
const char * getHeaderFilePath() const override { return __FILE__; }
protected:
UInt32 scale;
};

View File

@ -78,8 +78,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).value);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -154,11 +154,6 @@ public:
{
assert_cast<ColumnFloat64 &>(to).getData().push_back(getBoundingRatio(data(place)));
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -33,11 +33,6 @@ public:
return "categoricalInformationValue";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
void create(AggregateDataPtr place) const override
{
memset(place, 0, sizeOfData());

View File

@ -63,8 +63,6 @@ public:
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
const char * getHeaderFilePath() const override { return __FILE__; }
/// Reset the state to specified value. This function is not the part of common interface.
void set(AggregateDataPtr place, UInt64 new_count)
{
@ -115,8 +113,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -145,8 +145,6 @@ public:
auto & column = assert_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(this->data(place).get());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -247,8 +247,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -136,8 +136,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -400,8 +398,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE

View File

@ -203,8 +203,6 @@ public:
to_offsets.push_back(to_offsets.back() + result_array_size);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -192,8 +192,6 @@ public:
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
#undef AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE

View File

@ -52,8 +52,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -119,8 +117,6 @@ public:
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
template <typename Data>

View File

@ -118,8 +118,6 @@ public:
for (auto it = set.begin(); it != set.end(); ++it, ++i)
data_to[old_size + i] = it->getValue();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -255,8 +253,6 @@ public:
deserializeAndInsert(elem.getValue(), data_to);
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
template <>

View File

@ -369,8 +369,6 @@ public:
offsets_to.push_back(to_tuple.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
String getName() const override { return "histogram"; }
};

View File

@ -109,8 +109,6 @@ public:
{
return nested_func->isState();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -394,8 +394,6 @@ public:
this->data(place).returnWeights(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt64 param_num;
Float64 learning_rate;

View File

@ -162,11 +162,6 @@ public:
result_column.push_back(position_of_max_intersections);
}
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -98,8 +98,6 @@ public:
{
return nested_func->allocatesMemoryInArena();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -731,8 +731,6 @@ public:
{
this->data(place).insertResultInto(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -71,8 +71,6 @@ public:
{
to.insertDefault();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -176,8 +176,6 @@ public:
{
return nested_function->isState();
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -49,11 +49,6 @@ public:
return nested_function->getName() + "OrDefault";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();

View File

@ -179,8 +179,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
static void assertSecondArg(const DataTypes & types)
{
if constexpr (has_second_arg)

View File

@ -72,11 +72,6 @@ public:
return nested_function->getName() + "Resample";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();

View File

@ -144,11 +144,6 @@ public:
offsets_to.push_back(current_offset);
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -180,8 +180,6 @@ public:
this->data(place).deserialize(buf);
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
enum class PatternActionType
{

View File

@ -109,11 +109,6 @@ public:
return "simpleLinearRegression";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
void add(
AggregateDataPtr place,
const IColumn ** columns,

View File

@ -94,8 +94,6 @@ public:
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -147,8 +147,6 @@ public:
{
this->data(place).publish(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
/** Implementing the varSamp function.
@ -401,8 +399,6 @@ public:
{
this->data(place).publish(to);
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
/** Implementing the covarSamp function.

View File

@ -552,8 +552,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt32 src_scale;
};

View File

@ -147,8 +147,6 @@ public:
column.getData().push_back(this->data(place).get());
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt32 scale;
};

View File

@ -261,8 +261,6 @@ public:
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
bool keepKey(const T & key) const { return static_cast<const Derived &>(*this).keepKey(key); }
};

View File

@ -281,7 +281,5 @@ public:
}
bool allocatesMemoryInArena() const override { return true; }
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -103,8 +103,6 @@ public:
for (auto it = result_vec.begin(); it != result_vec.end(); ++it, ++i)
data_to[old_size + i] = it->key;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -230,8 +228,6 @@ public:
data_to.deserializeAndInsertFromArena(elem.key.data);
}
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -244,8 +244,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -300,8 +298,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -109,7 +109,7 @@ struct AggregateFunctionUniqCombinedData : public AggregateFunctionUniqCombinedD
};
/// For String keys, 64 bit hash is always used (both for uniqCombined and uniqCombined64),
/// For String keys, 64 bit hash is always used (both for uniqCombined and uniqCombined64),
/// because of backwards compatibility (64 bit hash was already used for uniqCombined).
template <UInt8 K, typename HashValueType>
struct AggregateFunctionUniqCombinedData<String, K, HashValueType> : public AggregateFunctionUniqCombinedDataWithKey<UInt64 /*always*/, K>
@ -171,11 +171,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
/** For multiple arguments. To compute, hashes them.
@ -238,11 +233,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -184,8 +184,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
@ -248,8 +246,6 @@ public:
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -245,11 +245,6 @@ public:
{
assert_cast<ColumnUInt8 &>(to).getData().push_back(getEventLevel(this->data(place)));
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -148,12 +148,6 @@ public:
addBatchArray(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
const = 0;
/** This is used for runtime code generation to determine, which header files to include in generated source.
* Always implement it as
* const char * getHeaderFilePath() const override { return __FILE__; }
*/
virtual const char * getHeaderFilePath() const = 0;
const DataTypes & getArgumentTypes() const { return argument_types; }
const Array & getParameters() const { return parameters; }

View File

@ -476,6 +476,7 @@ namespace ErrorCodes
extern const int S3_ERROR = 499;
extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA = 500;
extern const int CANNOT_CREATE_DATABASE = 501;
extern const int CANNOT_SIGQUEUE = 502;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

112
dbms/src/Common/PipeFDs.cpp Normal file
View File

@ -0,0 +1,112 @@
#include <Common/PipeFDs.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <common/logger_useful.h>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <algorithm>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PIPE;
extern const int CANNOT_FCNTL;
extern const int LOGICAL_ERROR;
}
void LazyPipeFDs::open()
{
for (int & fd : fds_rw)
if (fd >= 0)
throw Exception("Pipe is already opened", ErrorCodes::LOGICAL_ERROR);
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
#else
if (0 != pipe(fds_rw))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
throwFromErrno("Cannot setup auto-close on exec for read end of pipe", ErrorCodes::CANNOT_FCNTL);
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
throwFromErrno("Cannot setup auto-close on exec for write end of pipe", ErrorCodes::CANNOT_FCNTL);
#endif
}
void LazyPipeFDs::close()
{
for (int & fd : fds_rw)
{
if (fd < 0)
continue;
if (0 != ::close(fd))
throwFromErrno("Cannot close pipe", ErrorCodes::CANNOT_PIPE);
fd = -1;
}
}
PipeFDs::PipeFDs()
{
open();
}
LazyPipeFDs::~LazyPipeFDs()
{
try
{
close();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void LazyPipeFDs::setNonBlocking()
{
int flags = fcntl(fds_rw[1], F_GETFL, 0);
if (-1 == flags)
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
if (-1 == fcntl(fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
}
void LazyPipeFDs::tryIncreaseSize(int desired_size)
{
#if defined(OS_LINUX)
Poco::Logger * log = &Poco::Logger::get("Pipe");
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
int pipe_size = fcntl(fds_rw[1], F_GETPIPE_SZ);
if (-1 == pipe_size)
{
if (errno == EINVAL)
{
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
/// It will work nevertheless.
}
else
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
}
else
{
for (errno = 0; errno != EPERM && pipe_size < desired_size; pipe_size *= 2)
if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size)));
}
#else
(void)desired_size;
#endif
}
}

35
dbms/src/Common/PipeFDs.h Normal file
View File

@ -0,0 +1,35 @@
#pragma once
#include <cstddef>
namespace DB
{
/** Struct containing a pipe with lazy initialization.
* Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access
* pipe's file descriptors.
*/
struct LazyPipeFDs
{
int fds_rw[2] = {-1, -1};
void open();
void close();
void setNonBlocking();
void tryIncreaseSize(int desired_size);
~LazyPipeFDs();
};
/** Struct which opens new pipe on creation and closes it on destruction.
* Use `fds_rw` field to access pipe's file descriptors.
*/
struct PipeFDs : public LazyPipeFDs
{
PipeFDs();
};
}

View File

@ -1,12 +1,12 @@
#include "QueryProfiler.h"
#include <random>
#include <common/Pipe.h>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <Common/StackTrace.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/thread_local_rng.h>
@ -22,7 +22,7 @@ namespace ProfileEvents
namespace DB
{
extern LazyPipe trace_pipe;
extern LazyPipeFDs trace_pipe;
namespace
{

View File

@ -4,11 +4,11 @@
#include <dlfcn.h>
#include <Common/Exception.h>
#include <Common/ShellCommand.h>
#include <Common/PipeFDs.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include <port/unistd.h>
#include <csignal>
#include <common/Pipe.h>
namespace
{
@ -66,9 +66,9 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(const char * filename, c
if (!real_vfork)
throwFromErrno("Cannot find symbol vfork in myself", ErrorCodes::CANNOT_DLSYM);
Pipe pipe_stdin;
Pipe pipe_stdout;
Pipe pipe_stderr;
PipeFDs pipe_stdin;
PipeFDs pipe_stdout;
PipeFDs pipe_stderr;
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();

View File

@ -24,7 +24,7 @@ public:
/// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info
static bool checkPermissions();
#if defined(__linux__)
#if defined(OS_LINUX)
private:
int netlink_socket_fd = -1;
UInt16 taskstats_family_id = 0;

View File

@ -2,7 +2,7 @@
#include <Core/Field.h>
#include <Poco/Logger.h>
#include <common/Pipe.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
@ -19,13 +19,12 @@
namespace DB
{
LazyPipe trace_pipe;
LazyPipeFDs trace_pipe;
namespace ErrorCodes
{
extern const int NULL_POINTER_DEREFERENCE;
extern const int THREAD_IS_NOT_JOINABLE;
extern const int CANNOT_FCNTL;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
@ -40,36 +39,8 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
int flags = fcntl(trace_pipe.fds_rw[1], F_GETFL, 0);
if (-1 == flags)
throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
#if defined(OS_LINUX)
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
int pipe_size = fcntl(trace_pipe.fds_rw[1], F_GETPIPE_SZ);
if (-1 == pipe_size)
{
if (errno == EINVAL)
{
LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
/// It will work nevertheless.
}
else
throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
}
else
{
constexpr int max_pipe_capacity_to_set = 1048576;
for (errno = 0; errno != EPERM && pipe_size < max_pipe_capacity_to_set; pipe_size *= 2)
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
throwFromErrno("Cannot increase pipe capacity to " + toString(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, max_pipe_capacity_to_set)));
}
#endif
trace_pipe.setNonBlocking();
trace_pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}

View File

@ -385,6 +385,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \
M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \
M(SettingUInt64, mutations_sync, 0, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -344,16 +344,8 @@ void DatabaseOrdinary::alterTable(
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
if (ast_create_query.columns_list->indices)
ast_create_query.columns_list->replace(ast_create_query.columns_list->indices, new_indices);
else
ast_create_query.columns_list->set(ast_create_query.columns_list->indices, new_indices);
if (ast_create_query.columns_list->constraints)
ast_create_query.columns_list->replace(ast_create_query.columns_list->constraints, new_constraints);
else
ast_create_query.columns_list->set(ast_create_query.columns_list->constraints, new_constraints);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
if (storage_modifier)
storage_modifier(*ast_create_query.storage);

View File

@ -76,6 +76,9 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {});
/// Processors are not supported here yet.
context.getSettingsRef().experimental_use_processors = false;
/// Query context is needed because some code in executeQuery function may assume it exists.
/// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
context.makeQueryContext();
}
@ -100,6 +103,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, load_all_query{other.load_all_query}
{
context.makeQueryContext();
}
std::string ClickHouseDictionarySource::getUpdateFieldAndDate()

View File

@ -83,6 +83,10 @@ public:
{
abort();
}
else if (mode == "std::terminate")
{
std::terminate();
}
else if (mode == "use after free")
{
int * x_ptr;

View File

@ -46,7 +46,14 @@ BrotliWriteBuffer::BrotliWriteBuffer(WriteBuffer & out_, int compression_level,
BrotliWriteBuffer::~BrotliWriteBuffer()
{
finish();
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void BrotliWriteBuffer::nextImpl()

View File

@ -8,10 +8,6 @@ WriteBufferFromFileBase::WriteBufferFromFileBase(size_t buf_size, char * existin
{
}
WriteBufferFromFileBase::~WriteBufferFromFileBase()
{
}
off_t WriteBufferFromFileBase::seek(off_t off, int whence)
{
return doSeek(off, whence);

View File

@ -13,12 +13,12 @@ class WriteBufferFromFileBase : public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
virtual ~WriteBufferFromFileBase();
~WriteBufferFromFileBase() override = default;
off_t seek(off_t off, int whence = SEEK_SET);
void truncate(off_t length = 0);
virtual off_t getPositionInFile() = 0;
virtual void sync() = 0;
void sync() override = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;

View File

@ -89,8 +89,14 @@ public:
~WriteBufferFromVector() override
{
if (!is_finished)
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
};

View File

@ -133,4 +133,17 @@ void WriteBufferValidUTF8::finish()
putReplacement();
}
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -35,10 +35,7 @@ public:
const char * replacement_ = "\xEF\xBF\xBD",
size_t size = DEFAULT_SIZE);
virtual ~WriteBufferValidUTF8() override
{
finish();
}
~WriteBufferValidUTF8() override;
};
}

View File

@ -98,14 +98,6 @@ NameSet AnalyzedJoin::getQualifiedColumnsSet() const
return out;
}
NameSet AnalyzedJoin::getOriginalColumnsSet() const
{
NameSet out;
for (const auto & names : original_names)
out.insert(names.second);
return out;
}
NamesWithAliases AnalyzedJoin::getNamesWithAliases(const NameSet & required_columns) const
{
NamesWithAliases out;

View File

@ -96,7 +96,6 @@ public:
bool hasOn() const { return table_join.on_expression != nullptr; }
NameSet getQualifiedColumnsSet() const;
NameSet getOriginalColumnsSet() const;
NamesWithAliases getNamesWithAliases(const NameSet & required_columns) const;
NamesWithAliases getRequiredColumns(const Block & sample, const Names & action_columns) const;

View File

@ -170,11 +170,22 @@ size_t CollectJoinOnKeysMatcher::getTableForIdentifiers(std::vector<const ASTIde
if (!membership)
{
const String & name = identifier->name;
bool in_left_table = data.source_columns.count(name);
bool in_right_table = data.joined_columns.count(name);
bool in_left_table = data.left_table.hasColumn(name);
bool in_right_table = data.right_table.hasColumn(name);
if (in_left_table && in_right_table)
throw Exception("Column '" + name + "' is ambiguous", ErrorCodes::AMBIGUOUS_COLUMN_NAME);
{
/// Relax ambiguous check for multiple JOINs
if (auto original_name = IdentifierSemantic::uncover(*identifier))
{
auto match = IdentifierSemantic::canReferColumnToTable(*original_name, data.right_table.table);
if (match == IdentifierSemantic::ColumnMatch::NoMatch)
in_right_table = false;
in_left_table = !in_right_table;
}
else
throw Exception("Column '" + name + "' is ambiguous", ErrorCodes::AMBIGUOUS_COLUMN_NAME);
}
if (in_left_table)
membership = 1;

View File

@ -3,6 +3,7 @@
#include <Core/Names.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/Aliases.h>
@ -25,8 +26,8 @@ public:
struct Data
{
AnalyzedJoin & analyzed_join;
const NameSet & source_columns;
const NameSet & joined_columns;
const TableWithColumnNames & left_table;
const TableWithColumnNames & right_table;
const Aliases & aliases;
const bool is_asof{false};
ASTPtr asof_left_key{};

View File

@ -53,6 +53,20 @@ struct TableWithColumnNames
for (auto & column : addition)
hidden_columns.push_back(column.name);
}
bool hasColumn(const String & name) const
{
if (columns_set.empty())
{
columns_set.insert(columns.begin(), columns.end());
columns_set.insert(hidden_columns.begin(), hidden_columns.end());
}
return columns_set.count(name);
}
private:
mutable NameSet columns_set;
};
std::vector<DatabaseAndTableWithAlias> getDatabaseAndTables(const ASTSelectQuery & select_query, const String & current_database);

View File

@ -92,6 +92,22 @@ std::optional<String> IdentifierSemantic::getTableName(const ASTPtr & ast)
return {};
}
std::optional<ASTIdentifier> IdentifierSemantic::uncover(const ASTIdentifier & identifier)
{
if (identifier.semantic->covered)
{
std::vector<String> name_parts = identifier.name_parts;
return ASTIdentifier(std::move(name_parts));
}
return {};
}
void IdentifierSemantic::coverName(ASTIdentifier & identifier, const String & alias)
{
identifier.setShortName(alias);
identifier.semantic->covered = true;
}
bool IdentifierSemantic::canBeAlias(const ASTIdentifier & identifier)
{
return identifier.semantic->can_be_alias;

View File

@ -12,6 +12,7 @@ struct IdentifierSemanticImpl
{
bool special = false; /// for now it's 'not a column': tables, subselects and some special stuff like FORMAT
bool can_be_alias = true; /// if it's a cropped name it could not be an alias
bool covered = false; /// real (compound) name is hidden by an alias (short name)
std::optional<size_t> membership; /// table position in join
};
@ -43,6 +44,8 @@ struct IdentifierSemantic
static void setColumnLongName(ASTIdentifier & identifier, const DatabaseAndTableWithAlias & db_and_table);
static bool canBeAlias(const ASTIdentifier & identifier);
static void setMembership(ASTIdentifier &, size_t table_no);
static void coverName(ASTIdentifier &, const String & alias);
static std::optional<ASTIdentifier> uncover(const ASTIdentifier & identifier);
static std::optional<size_t> getMembership(const ASTIdentifier & identifier);
static bool chooseTable(const ASTIdentifier &, const std::vector<DatabaseAndTableWithAlias> & tables, size_t & best_table_pos,
bool ambiguous = false);

View File

@ -21,7 +21,6 @@
#include <Parsers/parseQuery.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
@ -36,7 +35,6 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
@ -97,29 +95,20 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
throw Exception("Database " + database_name + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
}
String database_engine_name;
if (!create.storage)
{
database_engine_name = "Ordinary"; /// Default database engine.
auto engine = std::make_shared<ASTFunction>();
engine->name = database_engine_name;
auto storage = std::make_shared<ASTStorage>();
engine->name = "Ordinary";
storage->set(storage->engine, engine);
create.set(create.storage, storage);
}
else
else if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
{
const ASTStorage & storage = *create.storage;
const ASTFunction & engine = *storage.engine;
/// Currently, there are no database engines, that support any arguments.
if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
{
std::stringstream ostr;
formatAST(storage, ostr, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
database_engine_name = engine.name;
std::stringstream ostr;
formatAST(*create.storage, ostr, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
String database_name_escaped = escapeForFileName(database_name);
@ -155,19 +144,27 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
out.close();
}
bool added = false;
bool renamed = false;
try
{
context.addDatabase(database_name, database);
added = true;
if (need_write_metadata)
{
Poco::File(metadata_file_tmp_path).renameTo(metadata_file_path);
renamed = true;
}
database->loadStoredObjects(context, has_force_restore_data_flag);
}
catch (...)
{
if (need_write_metadata)
if (renamed)
Poco::File(metadata_file_tmp_path).remove();
if (added)
context.detachDatabase(database_name);
throw;
}
@ -396,84 +393,97 @@ ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const A
}
ColumnsDescription InterpreterCreateQuery::setProperties(
ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const
InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(ASTCreateQuery & create) const
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
TableProperties properties;
TableStructureReadLockHolder as_storage_lock;
if (create.columns_list)
{
if (create.columns_list->columns)
columns = getColumnsDescription(*create.columns_list->columns, context);
properties.columns = getColumnsDescription(*create.columns_list->columns, context);
if (create.columns_list->indices)
for (const auto & index : create.columns_list->indices->children)
indices.indices.push_back(
properties.indices.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
if (create.columns_list->constraints)
for (const auto & constraint : create.columns_list->constraints->children)
constraints.constraints.push_back(
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
properties.constraints = getConstraintsDescription(create.columns_list->constraints);
}
else if (!create.as_table.empty())
{
columns = as_storage->getColumns();
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
StoragePtr as_storage = context.getTable(as_database_name, create.as_table);
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
properties.columns = as_storage->getColumns();
/// Secondary indices make sense only for MergeTree family of storage engines.
/// We should not copy them for other storages.
if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
indices = as_storage->getIndices();
properties.indices = as_storage->getIndices();
constraints = as_storage->getConstraints();
properties.constraints = as_storage->getConstraints();
}
else if (create.select)
{
columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
Block as_select_sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.select->clone(), context);
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
}
else if (create.as_table_function)
return {};
else
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
/// Even if query has list of columns, canonicalize it (unfold Nested columns).
ASTPtr new_columns = formatColumns(columns);
ASTPtr new_indices = formatIndices(indices);
ASTPtr new_constraints = formatConstraints(constraints);
if (!create.columns_list)
{
auto new_columns_list = std::make_shared<ASTColumns>();
create.set(create.columns_list, new_columns_list);
}
create.set(create.columns_list, std::make_shared<ASTColumns>());
if (create.columns_list->columns)
create.columns_list->replace(create.columns_list->columns, new_columns);
else
create.columns_list->set(create.columns_list->columns, new_columns);
ASTPtr new_columns = formatColumns(properties.columns);
ASTPtr new_indices = formatIndices(properties.indices);
ASTPtr new_constraints = formatConstraints(properties.constraints);
if (new_indices && create.columns_list->indices)
create.columns_list->replace(create.columns_list->indices, new_indices);
else if (new_indices)
create.columns_list->set(create.columns_list->indices, new_indices);
create.columns_list->setOrReplace(create.columns_list->columns, new_columns);
create.columns_list->setOrReplace(create.columns_list->indices, new_indices);
create.columns_list->setOrReplace(create.columns_list->constraints, new_constraints);
if (new_constraints && create.columns_list->constraints)
create.columns_list->replace(create.columns_list->constraints, new_constraints);
else if (new_constraints)
create.columns_list->set(create.columns_list->constraints, new_constraints);
validateTableStructure(create, properties);
/// Set the table engine if it was not specified explicitly.
setEngine(create);
return properties;
}
void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties) const
{
/// Check for duplicates
std::set<String> all_columns;
for (const auto & column : columns)
for (const auto & column : properties.columns)
{
if (!all_columns.emplace(column.name).second)
throw Exception("Column " + backQuoteIfNeed(column.name) + " already exists", ErrorCodes::DUPLICATE_COLUMN);
}
return columns;
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
{
for (const auto & name_and_type_pair : properties.columns.getAllPhysical())
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default "
"due to expected negative impact on performance. "
"It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.storage)
@ -535,23 +545,19 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
throw Exception("Temporary tables cannot be inside a database. You should not specify a database for a temporary table.",
ErrorCodes::BAD_DATABASE_FOR_TEMPORARY_TABLE);
String path = context.getPath();
String current_database = context.getCurrentDatabase();
String database_name = create.database.empty() ? current_database : create.database;
String table_name = create.table;
String table_name_escaped = escapeForFileName(table_name);
// If this is a stub ATTACH query, read the query definition from the database
if (create.attach && !create.storage && !create.columns_list)
{
// Table SQL definition is available even if the table is detached
auto query = context.getCreateTableQuery(database_name, table_name);
auto query = context.getCreateTableQuery(create.database, create.table);
create = query->as<ASTCreateQuery &>(); // Copy the saved create query, but use ATTACH instead of CREATE
create.attach = true;
}
if (create.to_database.empty())
String current_database = context.getCurrentDatabase();
if (!create.temporary && create.database.empty())
create.database = current_database;
if (!create.to_table.empty() && create.to_database.empty())
create.to_database = current_database;
if (create.select && (create.is_view || create.is_materialized_view || create.is_live_view))
@ -560,26 +566,63 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
visitor.visit(*create.select);
}
Block as_select_sample;
if (create.select && (!create.attach || !create.columns_list))
as_select_sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.select->clone(), context);
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = setProperties(create);
String as_database_name = create.as_database.empty() ? current_database : create.as_database;
String as_table_name = create.as_table;
/// Actually creates table
bool created = doCreateTable(create, properties);
StoragePtr as_storage;
TableStructureReadLockHolder as_storage_lock;
if (!created) /// Table already exists
return {};
if (!as_table_name.empty())
return fillTableIfNeeded(create);
}
bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties)
{
std::unique_ptr<DDLGuard> guard;
String data_path;
DatabasePtr database;
const String & table_name = create.table;
bool need_add_to_database = !create.temporary || create.is_live_view;
if (need_add_to_database)
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
database = context.getDatabase(create.database);
data_path = database->getDataPath();
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuard(create.database, table_name);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
/// TODO Check structure of table
if (create.if_not_exists)
return false;
else if (create.replace_view)
{
/// when executing CREATE OR REPLACE VIEW, drop current existing view
auto drop_ast = std::make_shared<ASTDropQuery>();
drop_ast->database = create.database;
drop_ast->table = table_name;
drop_ast->no_ddl_lock = true;
InterpreterDropQuery interpreter(drop_ast, context);
interpreter.execute();
}
else
throw Exception("Table " + create.database + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return false;
ColumnsDescription columns;
ConstraintsDescription constraints;
StoragePtr res;
if (create.as_table_function)
{
const auto & table_function = create.as_table_function->as<ASTFunction &>();
@ -588,99 +631,38 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
}
else
{
/// Set and retrieve list of columns.
columns = setProperties(create, as_select_sample, as_storage);
constraints = getConstraintsDescription(create.columns_list->constraints);
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
{
for (const auto & name_and_type_pair : columns.getAllPhysical())
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default due to expected negative impact on performance. It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
/// Set the table engine if it was not specified explicitly.
setEngine(create);
res = StorageFactory::instance().get(create,
data_path,
table_name,
create.database,
context,
context.getGlobalContext(),
properties.columns,
properties.constraints,
create.attach,
false);
}
{
std::unique_ptr<DDLGuard> guard;
if (need_add_to_database)
database->createTable(context, table_name, res, query_ptr);
else
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
String data_path;
DatabasePtr database;
/// We must call "startup" and "shutdown" while holding DDLGuard.
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
/// (in case when table was created and instantly dropped before started up)
///
/// Method "startup" may create background tasks and method "shutdown" will wait for them.
/// But if "shutdown" is called before "startup", it will exit early, because there are no background tasks to wait.
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
/// and the task will use references to freed data.
if (!create.temporary || create.is_live_view)
{
database = context.getDatabase(database_name);
data_path = database->getDataPath();
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuard(database_name, table_name);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
/// TODO Check structure of table
if (create.if_not_exists)
return {};
else if (create.replace_view)
{
/// when executing CREATE OR REPLACE VIEW, drop current existing view
auto drop_ast = std::make_shared<ASTDropQuery>();
drop_ast->database = database_name;
drop_ast->table = table_name;
drop_ast->no_ddl_lock = true;
InterpreterDropQuery interpreter(drop_ast, context);
interpreter.execute();
}
else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};
if (!create.as_table_function)
{
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
constraints,
create.attach,
false);
}
if (create.temporary && !create.is_live_view)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
else
database->createTable(context, table_name, res, query_ptr);
/// We must call "startup" and "shutdown" while holding DDLGuard.
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
/// (in case when table was created and instantly dropped before started up)
///
/// Method "startup" may create background tasks and method "shutdown" will wait for them.
/// But if "shutdown" is called before "startup", it will exit early, because there are no background tasks to wait.
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
/// and the task will use references to freed data.
res->startup();
}
res->startup();
return true;
}
BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
{
/// If the query is a CREATE SELECT, insert the data into the table.
if (create.select && !create.attach
&& !create.is_view && !create.is_live_view && (!create.is_materialized_view || create.is_populate))
@ -688,9 +670,9 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
auto insert = std::make_shared<ASTInsertQuery>();
if (!create.temporary)
insert->database = database_name;
insert->database = create.database;
insert->table = table_name;
insert->table = create.table;
insert->select = create.select->clone();
if (create.temporary && !context.getSessionContext().hasQueryContext())

View File

@ -49,15 +49,28 @@ public:
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
private:
struct TableProperties
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
};
BlockIO createDatabase(ASTCreateQuery & create);
BlockIO createTable(ASTCreateQuery & create);
BlockIO createDictionary(ASTCreateQuery & create);
/// Calculate list of columns, constraints, indices, etc... of table and return columns.
ColumnsDescription setProperties(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties setProperties(ASTCreateQuery & create) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
void setEngine(ASTCreateQuery & create) const;
void checkAccess(const ASTCreateQuery & create);
/// Create IStorage and add it to database. If table already exists and IF NOT EXISTS specified, do nothing and return false.
bool doCreateTable(const ASTCreateQuery & create, const TableProperties & properties);
/// Inserts data in created table if it's CREATE ... SELECT
BlockIO fillTableIfNeeded(const ASTCreateQuery & create);
ASTPtr query_ptr;
Context & context;

View File

@ -159,7 +159,7 @@ struct ColumnAliasesMatcher
aliases[alias] = long_name;
rev_aliases[long_name].push_back(alias);
identifier->setShortName(alias);
IdentifierSemantic::coverName(*identifier, alias);
if (is_public)
{
identifier->setAlias(long_name);
@ -177,7 +177,7 @@ struct ColumnAliasesMatcher
if (is_public && allowed_long_names.count(long_name))
; /// leave original name unchanged for correct output
else
identifier->setShortName(it->second[0]);
IdentifierSemantic::coverName(*identifier, it->second[0]);
}
}
}
@ -229,7 +229,7 @@ struct ColumnAliasesMatcher
if (!last_table)
{
node.setShortName(alias);
IdentifierSemantic::coverName(node, alias);
node.setAlias("");
}
}

View File

@ -532,8 +532,8 @@ void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_defaul
}
/// Find the columns that are obtained by JOIN.
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query, const NameSet & source_columns,
const Aliases & aliases)
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query,
const std::vector<TableWithColumnNames> & tables, const Aliases & aliases)
{
const ASTTablesInSelectQueryElement * node = select_query.join();
if (!node)
@ -551,7 +551,7 @@ void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & s
{
bool is_asof = (table_join.strictness == ASTTableJoin::Strictness::Asof);
CollectJoinOnKeysVisitor::Data data{analyzed_join, source_columns, analyzed_join.getOriginalColumnsSet(), aliases, is_asof};
CollectJoinOnKeysVisitor::Data data{analyzed_join, tables[0], tables[1], aliases, is_asof};
CollectJoinOnKeysVisitor(data).visit(table_join.on_expression);
if (!data.has_some)
throw Exception("Cannot get JOIN keys from JOIN ON section: " + queryToString(table_join.on_expression),
@ -820,6 +820,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
if (storage)
collectSourceColumns(storage->getColumns(), result.source_columns, (select_query != nullptr));
NameSet source_columns_set = removeDuplicateColumns(result.source_columns);
std::vector<TableWithColumnNames> tables_with_columns;
if (select_query)
{
@ -837,7 +838,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
}
std::vector<const ASTTableExpression *> table_expressions = getTableExpressions(*select_query);
auto tables_with_columns = getTablesWithColumns(table_expressions, context);
tables_with_columns = getTablesWithColumns(table_expressions, context);
if (tables_with_columns.empty())
{
@ -935,7 +936,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
setJoinStrictness(*select_query, settings.join_default_strictness, settings.any_join_distinct_right_table_keys,
result.analyzed_join->table_join);
collectJoinedColumns(*result.analyzed_join, *select_query, source_columns_set, result.aliases);
collectJoinedColumns(*result.analyzed_join, *select_query, tables_with_columns, result.aliases);
}
result.aggregates = getAggregates(query);

View File

@ -197,6 +197,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below.
memory_tracker.setParent(nullptr);
query_id.clear();
query_context = nullptr;
thread_group.reset();

View File

@ -146,6 +146,15 @@ public:
throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR);
}
template <typename T>
void setOrReplace(T * & field, const ASTPtr & child)
{
if (field)
replace(field, child);
else
set(field, child);
}
/// Convert to a string.
/// Format settings.

View File

@ -176,7 +176,12 @@ public:
/// Check can output.
if (output.isFinished())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
if (!output.canPush())
return Status::PortFull;

View File

@ -134,7 +134,10 @@ IProcessor::Status MergingSortedTransform::prepare()
auto chunk = input.pull();
if (!chunk.hasRows())
{
all_inputs_has_data = false;
if (!input.isFinished())
all_inputs_has_data = false;
continue;
}
@ -176,13 +179,14 @@ IProcessor::Status MergingSortedTransform::prepare()
return Status::NeedData;
auto chunk = input.pull();
if (!chunk.hasRows())
if (!chunk.hasRows() && !input.isFinished())
return Status::NeedData;
updateCursor(std::move(chunk), next_input_to_read);
pushToQueue(next_input_to_read);
need_data = false;
}
need_data = false;
}
return Status::Ready;

View File

@ -80,11 +80,15 @@ namespace
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, const std::string & name_, const ConnectionPoolPtr & pool_, ActionBlocker & monitor_blocker_)
StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
/// It's important to initialize members before `thread` to avoid race.
: storage(storage_)
, pool{pool_}
, name{name_}
, pool(std::move(pool_))
, name(std::move(name_))
, path{storage.path + name + '/'}
, should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts)
, min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
@ -92,10 +96,6 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, log{&Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker_)
{
const Settings & settings = storage.global_context.getSettingsRef();
should_batch_inserts = settings.distributed_directory_monitor_batch_inserts;
min_batched_block_size_rows = settings.min_insert_block_size_rows;
min_batched_block_size_bytes = settings.min_insert_block_size_bytes;
}

View File

@ -20,7 +20,7 @@ class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, const std::string & name_, const ConnectionPoolPtr & pool_, ActionBlocker & monitor_blocker_);
StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);
~StorageDistributedDirectoryMonitor();
@ -44,22 +44,22 @@ private:
std::string getLoggerName() const;
StorageDistributed & storage;
ConnectionPoolPtr pool;
std::string name;
const ConnectionPoolPtr pool;
const std::string name;
std::string path;
bool should_batch_inserts = false;
size_t min_batched_block_size_rows = 0;
size_t min_batched_block_size_bytes = 0;
const bool should_batch_inserts = false;
const size_t min_batched_block_size_rows = 0;
const size_t min_batched_block_size_bytes = 0;
String current_batch_file_path;
struct BatchHeader;
struct Batch;
size_t error_count{};
std::chrono::milliseconds default_sleep_time;
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;
std::chrono::milliseconds max_sleep_time;
const std::chrono::milliseconds max_sleep_time;
std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()};
std::atomic<bool> quit {false};
std::mutex mutex;

View File

@ -39,7 +39,7 @@ void StorageFactory::registerStorage(const std::string & name, Creator creator)
StoragePtr StorageFactory::get(
ASTCreateQuery & query,
const ASTCreateQuery & query,
const String & data_path,
const String & table_name,
const String & database_name,

View File

@ -47,7 +47,7 @@ public:
using Creator = std::function<StoragePtr(const Arguments & arguments)>;
StoragePtr get(
ASTCreateQuery & query,
const ASTCreateQuery & query,
const String & data_path,
const String & table_name,
const String & database_name,

View File

@ -425,17 +425,18 @@ public:
};
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{
/// Choose any disk, because when we load mutations we search them at each disk
/// where storage can be placed. See loadMutations().
auto disk = storage_policy->getAnyDisk();
MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get());
String file_name;
Int64 version;
{
std::lock_guard lock(currently_processing_in_background_mutex);
Int64 version = increment.get();
version = increment.get();
entry.commit(version);
file_name = entry.file_name;
auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry));
@ -444,6 +445,16 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context &
LOG_INFO(log, "Added mutation: " << file_name);
merging_mutating_task_handle->wake();
/// We have to wait mutation end
if (query_context.getSettingsRef().mutations_sync > 0)
{
LOG_INFO(log, "Waiting mutation: " << file_name);
auto check = [version, this]() { return isMutationDone(version); };
std::unique_lock lock(mutation_wait_mutex);
mutation_wait_event.wait(lock, check);
}
}
namespace
@ -462,6 +473,21 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
}
bool StorageMergeTree::isMutationDone(Int64 mutation_version) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
/// Killed
if (!current_mutations_by_version.count(mutation_version))
return true;
auto data_parts = getDataPartsVector();
for (const auto & data_part : data_parts)
if (data_part->info.getDataVersion() < mutation_version)
return false;
return true;
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::lock_guard lock(currently_processing_in_background_mutex);
@ -535,6 +561,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
mutation_wait_event.notify_all();
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
merging_mutating_task_handle->wake();
@ -771,6 +798,9 @@ bool StorageMergeTree::tryMutatePart()
renameTempPartAndReplace(new_part);
tagger->is_successful = true;
write_part_log({});
/// Notify all, who wait for this or previous mutations
mutation_wait_event.notify_all();
}
catch (...)
{

View File

@ -79,6 +79,10 @@ public:
private:
/// Mutex and condvar for synchronous mutations wait
std::mutex mutation_wait_mutex;
std::condition_variable mutation_wait_event;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
@ -138,6 +142,8 @@ private:
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
/// Just checks versions of each active data part
bool isMutationDone(Int64 mutation_version) const;
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;

View File

@ -54,6 +54,7 @@
#include <thread>
#include <future>
#include <boost/algorithm/string/join.hpp>
namespace ProfileEvents
{
@ -309,6 +310,78 @@ bool StorageReplicatedMergeTree::checkFixedGranualrityInZookeeper()
}
void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const
{
if (replicas.empty())
return;
zkutil::EventPtr wait_event = std::make_shared<Poco::Event>();
std::set<String> inactive_replicas;
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to apply mutation " + mutation_id);
while (!partial_shutdown_called)
{
/// Mutation maybe killed or whole replica was deleted.
/// Wait event will unblock at this moment.
Coordination::Stat exists_stat;
if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id, &exists_stat, wait_event))
{
LOG_WARNING(log, "Mutation " << mutation_id << " was killed or manually removed. Nothing to wait.");
return;
}
auto zookeeper = getZooKeeper();
/// Replica could be inactive.
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{
LOG_WARNING(log, "Replica " << replica << " is not active during mutation. "
"Mutation will be done asynchronously when replica becomes active.");
inactive_replicas.emplace(replica);
break;
}
String mutation_pointer = zookeeper_path + "/replicas/" + replica + "/mutation_pointer";
std::string mutation_pointer_value;
Coordination::Stat get_stat;
/// Replica could be removed
if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, &get_stat, wait_event))
{
LOG_WARNING(log, replica << " was removed");
break;
}
else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation
break; /// (numbers like 0000000000 and 0000000001)
/// We wait without timeout.
wait_event->wait();
}
if (partial_shutdown_called)
throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.",
ErrorCodes::UNFINISHED);
}
if (!inactive_replicas.empty())
{
std::stringstream exception_message;
exception_message << "Mutation is not finished because";
if (!inactive_replicas.empty())
exception_message << " some replicas are inactive right now: " << boost::algorithm::join(inactive_replicas, ", ");
exception_message << ". Mutation will be done asynchronously";
throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
}
}
void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{
auto zookeeper = getZooKeeper();
@ -3200,6 +3273,7 @@ void StorageReplicatedMergeTree::alter(
int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck.
};
/// /columns and /metadata nodes
std::vector<ChangedNode> changed_nodes;
{
@ -3294,6 +3368,10 @@ void StorageReplicatedMergeTree::alter(
time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout;
/// This code is quite similar with waitMutationToFinishOnReplicas
/// but contains more complicated details (versions manipulations, multiple nodes, etc.).
/// It will be removed soon in favor of alter-modify implementation on top of mutations.
/// TODO (alesap)
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
@ -3396,8 +3474,16 @@ void StorageReplicatedMergeTree::alter(
if (replica_nodes_changed_concurrently)
continue;
/// Now wait for replica nodes to change.
/// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata
/// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata,
/// which is common for all replicas. If changes happen with this nodes (delete, set and create)
/// than event will be notified and wait will be interrupted.
///
/// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and
/// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer*
/// concurrent alter from other replica. First of all it will update shared nodes and we will have no
/// ability to identify, that our *current* alter finshed. So we cannot do anything better than just
/// return from *current* alter with success result.
if (!replication_alter_columns_timeout)
{
alter_query_event->wait();
@ -4399,7 +4485,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
}
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context &)
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{
/// Overview of the mutation algorithm.
///
@ -4502,6 +4588,19 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
else
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
/// we have to wait
if (query_context.getSettingsRef().mutations_sync != 0)
{
Strings replicas;
if (query_context.getSettingsRef().mutations_sync == 2) /// wait for all replicas
replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself
replicas.push_back(replica_path);
waitMutationToFinishOnReplicas(replicas, entry.znode_name);
}
}
std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsStatus() const

View File

@ -532,6 +532,10 @@ private:
/// return true if it's fixed
bool checkFixedGranualrityInZookeeper();
/// Wait for timeout seconds mutation is finished on replicas
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/

View File

@ -0,0 +1,219 @@
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#include <signal.h>
#include <poll.h>
#include <mutex>
#include <filesystem>
#include <ext/scope_guard.h>
#include <Storages/System/StorageSystemStackTrace.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <Common/PipeFDs.h>
#include <common/getThreadNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SIGQUEUE;
extern const int CANNOT_MANIPULATE_SIGSET;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int LOGICAL_ERROR;
}
namespace
{
const pid_t expected_pid = getpid();
const int sig = SIGRTMIN;
int sequence_num = 0; /// For messages sent via pipe.
UInt32 thread_number{0};
std::optional<StackTrace> stack_trace;
static constexpr size_t max_query_id_size = 128;
char query_id_data[max_query_id_size];
size_t query_id_size = 0;
LazyPipeFDs notification_pipe;
void signalHandler(int, siginfo_t * info, void * context)
{
/// In case malicious user is sending signals manually (for unknown reason).
/// If we don't check - it may break our synchronization.
if (info->si_pid != expected_pid)
return;
/// Signal received too late.
if (info->si_value.sival_int != sequence_num)
return;
/// All these methods are signal-safe.
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
stack_trace.emplace(signal_context);
thread_number = getThreadNumber();
StringRef query_id = CurrentThread::getQueryId();
query_id_size = std::min(query_id.size, max_query_id_size);
memcpy(query_id_data, query_id.data, query_id_size);
int notification_num = info->si_value.sival_int;
ssize_t res = ::write(notification_pipe.fds_rw[1], &notification_num, sizeof(notification_num));
/// We cannot do anything if write failed.
(void)res;
}
/// Wait for data in pipe and read it.
bool wait(int timeout_ms)
{
while (true)
{
int fd = notification_pipe.fds_rw[0];
pollfd poll_fd{fd, POLLIN, 0};
int poll_res = poll(&poll_fd, 1, timeout_ms);
if (poll_res < 0)
{
if (errno == EINTR)
{
--timeout_ms; /// Quite a hacky way to update timeout. Just to make sure we avoid infinite waiting.
if (timeout_ms == 0)
return false;
continue;
}
throwFromErrno("Cannot poll pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (poll_res == 0)
return false;
int notification_num = 0;
ssize_t read_res = ::read(fd, &notification_num, sizeof(notification_num));
if (read_res < 0)
{
if (errno == EINTR)
continue;
throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (read_res == sizeof(notification_num))
{
if (notification_num == sequence_num)
return true;
else
continue; /// Drain delayed notifications.
}
throw Exception("Logical error: read wrong number of bytes from pipe", ErrorCodes::LOGICAL_ERROR);
}
}
}
StorageSystemStackTrace::StorageSystemStackTrace(const String & name_)
: IStorageSystemOneBlock<StorageSystemStackTrace>(name_)
{
notification_pipe.open();
/// Setup signal handler.
struct sigaction sa{};
sa.sa_sigaction = signalHandler;
sa.sa_flags = SA_SIGINFO;
if (sigemptyset(&sa.sa_mask))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaddset(&sa.sa_mask, sig))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaction(sig, &sa, nullptr))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
}
NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
{
return
{
{ "thread_number", std::make_shared<DataTypeUInt32>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) }
};
}
void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
/// It shouldn't be possible to do concurrent reads from this table.
std::lock_guard lock(mutex);
/// Send a signal to every thread and wait for result.
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
/// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals).
/// Obviously, results for different threads may be out of sync.
/// There is no better way to enumerate threads in a process other than looking into procfs.
std::filesystem::directory_iterator end;
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
{
pid_t tid = parse<pid_t>(it->path().filename());
sigval sig_value{};
sig_value.sival_int = sequence_num;
if (0 != ::sigqueue(tid, sig, sig_value))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(100))
{
size_t stack_trace_size = stack_trace->getSize();
size_t stack_trace_offset = stack_trace->getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace->getFrames()[i]));
res_columns[0]->insert(thread_number);
res_columns[1]->insertData(query_id_data, query_id_size);
res_columns[2]->insert(arr);
}
else
{
/// Cannot obtain a stack trace. But create a record in result nevertheless.
res_columns[0]->insert(tid); /// TODO Replace all thread numbers to OS thread numbers.
res_columns[1]->insertDefault();
res_columns[2]->insertDefault();
}
sequence_num = static_cast<int>(static_cast<unsigned>(sequence_num) + 1);
}
}
}
#endif

View File

@ -0,0 +1,37 @@
#pragma once
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#include <mutex>
#include <ext/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
/// Allows to introspect stack trace of all server threads.
/// It acts like an embedded debugger.
/// More than one instance of this table cannot be used.
class StorageSystemStackTrace : public ext::shared_ptr_helper<StorageSystemStackTrace>, public IStorageSystemOneBlock<StorageSystemStackTrace>
{
friend struct ext::shared_ptr_helper<StorageSystemStackTrace>;
public:
String getName() const override { return "SystemStackTrace"; }
static NamesAndTypesList getNamesAndTypes();
StorageSystemStackTrace(const String & name_);
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
mutable std::mutex mutex;
};
}
#endif

View File

@ -39,6 +39,10 @@
#include <Storages/System/StorageSystemDisks.h>
#include <Storages/System/StorageSystemStoragePolicies.h>
#ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h>
#endif
namespace DB
{
@ -65,6 +69,9 @@ void attachSystemTablesLocal(IDatabase & system_database)
system_database.attachTable("collations", StorageSystemCollations::create("collations"));
system_database.attachTable("table_engines", StorageSystemTableEngines::create("table_engines"));
system_database.attachTable("contributors", StorageSystemContributors::create("contributors"));
#ifdef OS_LINUX
system_database.attachTable("stack_trace", StorageSystemStackTrace::create("stack_trace"));
#endif
}
void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)

View File

@ -14,27 +14,49 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation VALUES ('2001-01-01
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync = 1" &
check_query1="SELECT substr(latest_fail_reason, 1, 8) as ErrorCode FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND ErrorCode != ''"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
while [ -z "$query_result" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
sleep 0.1
done
sleep 0.1
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_1_1_0', '20010101_2_2_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation'"
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation'"
wait
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation'"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that blocks another mutation ***'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1 SETTINGS mutations_sync = 1" &
check_query2="SELECT substr(latest_fail_reason, 1, 8) as ErrorCode FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt' AND ErrorCode != ''"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
while [ -z "$query_result" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
sleep 0.1
done
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_1_1_0', '20010101_2_2_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'"
sleep 0.1
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'"
wait_for_mutation "kill_mutation" "mutation_5.txt" "test"
wait
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.kill_mutation"

View File

@ -1,6 +1,7 @@
*** Create and kill a single invalid mutation ***
0000000000 1 1 Code: 6,
waiting test kill_mutation_r1 0000000000
Mutation 0000000000 was killed
*** Create and kill invalid mutation that blocks another mutation ***
0000000001 1 1 Code: 6,
waiting test kill_mutation_r1 0000000001

View File

@ -17,13 +17,25 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation_r1 VALUES ('2001-01
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1"
# wrong mutation
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync=2" 2>&1 | grep -o "Mutation 0000000000 was killed" &
sleep 1
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0', '20010101_0_0_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'"
check_query1="SELECT substr(latest_fail_reason, 1, 8) as ErrorCode FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND ErrorCode != ''"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
while [ -z "$query_result" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
sleep 0.1
done
$CLICKHOUSE_CLIENT --query="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0', '20010101_0_0_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'"
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation_r1'"
wait
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'"
@ -31,14 +43,25 @@ ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that
${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA test.kill_mutation_r1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE x = 1"
sleep 1
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0_1', '20010101_0_0_0_1'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'"
# good mutation, but blocked with wrong mutation
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE x = 1 SETTINGS mutations_sync=2" &
check_query2="SELECT substr(latest_fail_reason, 1, 8) as ErrorCode FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001' AND ErrorCode != ''"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query2" 2>&1`
while [ -z "$query_result" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query2" 2>&1`
sleep 0.1
done
$CLICKHOUSE_CLIENT --query="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0_1', '20010101_0_0_0_1'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'"
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'"
wait_for_mutation "kill_mutation_r2" "0000000002" "test"
wait
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.kill_mutation_r2"

View File

@ -34,7 +34,7 @@ $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_to_kill' ASYNC" &>/dev/nul
sleep 1
# Kill $query_for_pending SYNC. This query is not blocker, so it should be killed fast.
timeout 5 $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending' SYNC" &>/dev/null
timeout 10 $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending' SYNC" &>/dev/null
# Both queries have to be killed, doesn't matter with SYNC or ASYNC kill
for run in {1..15}

View File

@ -0,0 +1,8 @@
Replicated
1
1
1
Normal
1
1
1

View File

@ -0,0 +1,43 @@
DROP TABLE IF EXISTS table_for_synchronous_mutations1;
DROP TABLE IF EXISTS table_for_synchronous_mutations2;
SELECT 'Replicated';
CREATE TABLE table_for_synchronous_mutations1(k UInt32, v1 UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/table_for_synchronous_mutations', '1') ORDER BY k;
CREATE TABLE table_for_synchronous_mutations2(k UInt32, v1 UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/table_for_synchronous_mutations', '2') ORDER BY k;
INSERT INTO table_for_synchronous_mutations1 select number, number from numbers(100000);
SYSTEM SYNC REPLICA table_for_synchronous_mutations2;
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1';
-- Another mutation, just to be sure, that previous finished
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1';
DROP TABLE IF EXISTS table_for_synchronous_mutations1;
DROP TABLE IF EXISTS table_for_synchronous_mutations2;
SELECT 'Normal';
DROP TABLE IF EXISTS table_for_synchronous_mutations_no_replication;
CREATE TABLE table_for_synchronous_mutations_no_replication(k UInt32, v1 UInt64) ENGINE MergeTree ORDER BY k;
INSERT INTO table_for_synchronous_mutations_no_replication select number, number from numbers(100000);
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication';
-- Another mutation, just to be sure, that previous finished
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication';
DROP TABLE IF EXISTS table_for_synchronous_mutations_no_replication;

View File

@ -0,0 +1,16 @@
drop table if exists default.test_01051_d;
drop table if exists default.test_view_01051_d;
drop dictionary if exists default.test_dict_01051_d;
create table default.test_01051_d (key UInt64, value String) engine = MergeTree order by key;
create view default.test_view_01051_d (key UInt64, value String) as select k2 + 1 as key, v2 || '_x' as value from (select key + 2 as k2, value || '_y' as v2 from default.test_01051_d);
insert into default.test_01051_d values (1, 'a');
create dictionary default.test_dict_01051_d (key UInt64, value String) primary key key source(clickhouse(host 'localhost' port '9000' user 'default' password '' db 'default' table 'test_view_01051_d')) layout(flat()) lifetime(100500);
select dictGet('default.test_dict_01051_d', 'value', toUInt64(4));
drop table if exists default.test_01051_d;
drop table if exists default.test_view_01051_d;
drop dictionary if exists default.test_dict_01051_d;

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS a;
DROP TABLE IF EXISTS b;
DROP TABLE IF EXISTS c;
CREATE TABLE a (x UInt64) ENGINE = Memory;
CREATE TABLE b (x UInt64) ENGINE = Memory;
CREATE TABLE c (x UInt64) ENGINE = Memory;
SET enable_optimize_predicate_expression = 0;
SELECT a.x AS x FROM a
LEFT JOIN b ON a.x = b.x
LEFT JOIN c ON a.x = c.x;
SELECT a.x AS x FROM a
LEFT JOIN b ON a.x = b.x
LEFT JOIN c ON b.x = c.x;
SELECT b.x AS x FROM a
LEFT JOIN b ON a.x = b.x
LEFT JOIN c ON b.x = c.x;
SELECT c.x AS x FROM a
LEFT JOIN b ON a.x = b.x
LEFT JOIN c ON b.x = c.x;
DROP TABLE a;
DROP TABLE b;
DROP TABLE c;

View File

@ -21,14 +21,14 @@ mv ./dbms/unit_tests_dbms /output
find . -name '*.so' -print -exec mv '{}' /output \;
find . -name '*.so.*' -print -exec mv '{}' /output \;
count=`ls -1 /output/*.so 2>/dev/null | wc -l`
if [ $count != 0 ]
# May be set for split build or for performance test.
if [ "" != "$COMBINED_OUTPUT" ]
then
mkdir -p /output/config
cp ../dbms/programs/server/config.xml /output/config
cp ../dbms/programs/server/users.xml /output/config
cp -r ../dbms/programs/server/config.d /output/config
tar -czvf shared_build.tgz /output
tar -czvf "$COMBINED_OUTPUT.tgz" /output
rm -r /output/*
mv shared_build.tgz /output
mv "$COMBINED_OUTPUT.tgz" /output
fi

View File

@ -103,7 +103,7 @@ def run_vagrant_box_with_env(image_path, output_dir, ch_root):
logging.info("Copying binary back")
vagrant.copy_from_image("~/ClickHouse/dbms/programs/clickhouse", output_dir)
def parse_env_variables(build_type, compiler, sanitizer, package_type, cache, distcc_hosts, unbundled, split_binary, version, author, official, alien_pkgs, with_coverage):
def parse_env_variables(build_type, compiler, sanitizer, package_type, image_type, cache, distcc_hosts, unbundled, split_binary, version, author, official, alien_pkgs, with_coverage):
CLANG_PREFIX = "clang"
DARWIN_SUFFIX = "-darwin"
ARM_SUFFIX = "-aarch64"
@ -135,15 +135,21 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, cache, di
cxx = cc.replace('gcc', 'g++').replace('clang', 'clang++')
if package_type == "deb":
if image_type == "deb":
result.append("DEB_CC={}".format(cc))
result.append("DEB_CXX={}".format(cxx))
elif package_type == "binary":
elif image_type == "binary":
result.append("CC={}".format(cc))
result.append("CXX={}".format(cxx))
cmake_flags.append('-DCMAKE_C_COMPILER=`which {}`'.format(cc))
cmake_flags.append('-DCMAKE_CXX_COMPILER=`which {}`'.format(cxx))
# Create combined output archive for split build and for performance tests.
if package_type == "performance":
result.append("COMBINED_OUTPUT=performance")
elif split_binary:
result.append("COMBINED_OUTPUT=shared_build")
if sanitizer:
result.append("SANITIZER={}".format(sanitizer))
if build_type:
@ -193,7 +199,8 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, cache, di
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
parser = argparse.ArgumentParser(description="ClickHouse building script using prebuilt Docker image")
parser.add_argument("--package-type", choices=IMAGE_MAP.keys(), required=True)
# 'performance' creates a combined .tgz with server and configs to be used for performance test.
parser.add_argument("--package-type", choices=['deb', 'binary', 'performance', 'freebsd'], required=True)
parser.add_argument("--clickhouse-repo-path", default="../../")
parser.add_argument("--output-dir", required=True)
parser.add_argument("--build-type", choices=("debug", ""), default="")
@ -215,25 +222,26 @@ if __name__ == "__main__":
if not os.path.isabs(args.output_dir):
args.output_dir = os.path.abspath(os.path.join(os.getcwd(), args.output_dir))
image_name = IMAGE_MAP[args.package_type]
image_type = 'binary' if args.package_type == 'performance' else args.package_type
image_name = IMAGE_MAP[image_type]
if not os.path.isabs(args.clickhouse_repo_path):
ch_root = os.path.abspath(os.path.join(os.getcwd(), args.clickhouse_repo_path))
else:
ch_root = args.clickhouse_repo_path
if args.alien_pkgs and not args.package_type == "deb":
if args.alien_pkgs and not image_type == "deb":
raise Exception("Can add alien packages only in deb build")
dockerfile = os.path.join(ch_root, "docker/packager", args.package_type, "Dockerfile")
if args.package_type != "freebsd" and not check_image_exists_locally(image_name) or args.force_build_image:
dockerfile = os.path.join(ch_root, "docker/packager", image_type, "Dockerfile")
if image_type != "freebsd" and not check_image_exists_locally(image_name) or args.force_build_image:
if not pull_image(image_name) or args.force_build_image:
build_image(image_name, dockerfile)
env_prepared = parse_env_variables(
args.build_type, args.compiler, args.sanitizer, args.package_type,
args.build_type, args.compiler, args.sanitizer, args.package_type, image_type,
args.cache, args.distcc_hosts, args.unbundled, args.split_binary,
args.version, args.author, args.official, args.alien_pkgs, args.with_coverage)
if args.package_type != "freebsd":
if image_type != "freebsd":
run_docker_image_with_env(image_name, args.output_dir, env_prepared, ch_root, args.ccache_dir)
else:
logging.info("Running freebsd build, arguments will be ignored")

View File

@ -11,7 +11,7 @@ install_packages() {
}
download_data() {
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets"
clickhouse-client --query "ATTACH DATABASE IF NOT EXISTS datasets ENGINE = Ordinary"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS test"
/s3downloader --dataset-names $OPEN_DATASETS
/s3downloader --dataset-names $PRIVATE_DATASETS --url 'https://s3.mds.yandex.net/clickhouse-private-datasets'

View File

@ -39,7 +39,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
&& /s3downloader --dataset-names $DATASETS \
&& chmod 777 -R /var/lib/clickhouse \
&& clickhouse-client --query "SHOW DATABASES" \
&& clickhouse-client --query "CREATE DATABASE datasets" \
&& clickhouse-client --query "ATTACH DATABASE datasets ENGINE = Ordinary" \
&& clickhouse-client --query "CREATE DATABASE test" \
&& service clickhouse-server restart && sleep 5 \
&& clickhouse-client --query "SHOW TABLES FROM datasets" \

Some files were not shown because too many files have changed in this diff Show More