Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2017-10-13 04:09:51 +03:00
commit 0ec218a853
97 changed files with 1440 additions and 1011 deletions

View File

@ -79,7 +79,7 @@ This is a bugfix release. The following bugs were fixed:
## Major changes:
* Improved security: all server files are created with 0640 permissions.
* Improved security: all server files are created with 0640 permissions (can be changed via <umask> config parameter).
* Improved error messages for queries with invalid syntax.
* Significantly reduced memory consumption and improved performance when merging large sections of MergeTree data.
* Significantly increased the performance of data merges for the ReplacingMergeTree engine.

View File

@ -77,7 +77,7 @@
* Добавлен параметр max_size для агрегатной функции `groupArray(max_size)(column)`, и оптимизирована её производительность
## Основные изменения:
* Улучшение безопасности: все файлы сервера создаются с правами 0640
* Улучшение безопасности: все файлы сервера создаются с правами 0640 (можно поменять, через параметр <umask> в конфиге).
* Улучшены сообщения об ошибках в случае синтаксически неверных запросов
* Значительно уменьшен расход оперативной памяти и улучшена производительность слияний больших MergeTree-кусков данных
* Значительно увеличена производительность слияний данных для движка ReplacingMergeTree

View File

@ -2,4 +2,4 @@ ClickHouse is an open-source column-oriented database management system that all
[Read more...](https://clickhouse.yandex/)
[ClickHouse Meetup in Berlin on October 5, 2017](https://events.yandex.com/events/meetings/05-10-2017/)
[ClickHouse Community Meetup in Palo Alto on October 25, 2017](http://bit.ly/clickhouse-meetup-palo-alto-october-2017)

View File

@ -308,7 +308,7 @@ void Connection::sendQuery(
if (!connected)
connect();
network_compression_method = settings ? settings->network_compression_method.value : CompressionMethod::LZ4;
compression_settings = settings ? CompressionSettings(*settings) : CompressionSettings(CompressionMethod::LZ4);
query_id = query_id_;
@ -380,7 +380,7 @@ void Connection::sendData(const Block & block, const String & name)
if (!block_out)
{
if (compression == Protocol::Compression::Enable)
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, network_compression_method);
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_settings);
else
maybe_compressed_out = out;

View File

@ -16,6 +16,8 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <IO/CompressionSettings.h>
#include <Interpreters/Settings.h>
#include <Interpreters/TablesStatus.h>
@ -222,8 +224,9 @@ private:
String query_id;
Protocol::Compression compression; /// Enable data compression for communication.
Protocol::Encryption encryption; /// Enable data encryption for communication.
/// What compression algorithm to use while sending data for INSERT queries and external tables.
CompressionMethod network_compression_method = CompressionMethod::LZ4;
/// What compression settings to use while sending data for INSERT queries and external tables.
CompressionSettings compression_settings;
/** If not nullptr, used to limit network traffic.
* Only traffic for transferring blocks is accounted. Other packets don't.

View File

@ -910,6 +910,23 @@ ColumnPtr ColumnArray::replicateTuple(const Offsets_t & replicate_offsets) const
}
ColumnPtr ColumnArray::getLengthsColumn() const
{
const auto & offsets_data = getOffsets();
size_t size = offsets_data.size();
auto column = std::make_shared<ColumnVector<ColumnArray::Offset_t>>(offsets->size());
auto & data = column->getData();
if (size)
data[0] = offsets_data[0];
for (size_t i = 1; i < size; ++i)
data[i] = offsets_data[i] - offsets_data[i - 1];
return column;
}
void ColumnArray::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);

View File

@ -80,6 +80,9 @@ public:
return scatterImpl<ColumnArray>(num_columns, selector);
}
/// Creates and returns a column with array sizes.
ColumnPtr getLengthsColumn() const;
void gather(ColumnGathererStream & gatherer_stream) override;
private:

View File

@ -0,0 +1,63 @@
#pragma once
#include <atomic>
namespace DB
{
/// An atomic variable that is used to block and interrupt certain actions
/// If it is not zero then actions related with it should be considered as interrupted
class ActionBlocker
{
private:
mutable std::atomic<int> counter{0};
public:
bool isCancelled() const { return counter > 0; }
/// Temporarily blocks corresponding actions (while the returned object is alive)
struct BlockHolder;
BlockHolder cancel() const { return BlockHolder(this); }
/// Cancel the actions forever.
void cancelForever() const { ++counter; }
/// Returns reference to counter to allow to watch on it directly.
auto & getCounter() { return counter; }
/// Blocks related action while a BlockerHolder instance exists
struct BlockHolder
{
explicit BlockHolder(const ActionBlocker * var_ = nullptr) : var(var_)
{
if (var)
++var->counter;
}
BlockHolder(BlockHolder && other) noexcept
{
*this = std::move(other);
}
BlockHolder & operator=(BlockHolder && other) noexcept
{
var = other.var;
other.var = nullptr;
return *this;
}
BlockHolder(const BlockHolder & other) = delete;
BlockHolder & operator=(const BlockHolder & other) = delete;
~BlockHolder()
{
if (var)
--var->counter;
}
private:
const ActionBlocker * var = nullptr;
};
};
}

View File

@ -35,6 +35,10 @@
M(StorageBufferBytes) \
M(DictCacheRequests) \
M(Revision) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
M(RWLockActiveWriters)
namespace CurrentMetrics

View File

@ -127,6 +127,10 @@
M(DataAfterMergeDiffersFromReplica) \
M(PolygonsAddedToPool) \
M(PolygonsInPoolAllocatedBytes) \
M(RWLockAcquiredReadLocks) \
M(RWLockAcquiredWriteLocks) \
M(RWLockReadersWaitMilliseconds) \
M(RWLockWritersWaitMilliseconds)
namespace ProfileEvents
{

View File

@ -1,7 +1,27 @@
#include "RWLockFIFO.h"
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <iostream>
#include <Poco/Ext/ThreadNumber.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event RWLockAcquiredReadLocks;
extern const Event RWLockAcquiredWriteLocks;
extern const Event RWLockReadersWaitMilliseconds;
extern const Event RWLockWritersWaitMilliseconds;
}
namespace CurrentMetrics
{
extern const Metric RWLockWaitingReaders;
extern const Metric RWLockWaitingWriters;
extern const Metric RWLockActiveReaders;
extern const Metric RWLockActiveWriters;
}
namespace DB
@ -13,12 +33,42 @@ namespace ErrorCodes
}
RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::Client client)
class RWLockFIFO::LockHandlerImpl
{
RWLockFIFOPtr parent;
GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client;
ThreadToHandler::iterator it_handler;
CurrentMetrics::Increment active_client_increment;
LockHandlerImpl(RWLockFIFOPtr && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client);
public:
LockHandlerImpl(const LockHandlerImpl & other) = delete;
~LockHandlerImpl();
friend class RWLockFIFO;
};
RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::Client client)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders
: CurrentMetrics::RWLockWaitingWriters);
auto finalize_metrics = [type, &watch] ()
{
ProfileEvents::increment((type == Read) ? ProfileEvents::RWLockAcquiredReadLocks
: ProfileEvents::RWLockAcquiredWriteLocks);
ProfileEvents::increment((type == Read) ? ProfileEvents::RWLockReadersWaitMilliseconds
: ProfileEvents::RWLockWritersWaitMilliseconds, watch.elapsedMilliseconds());
};
auto this_thread_id = std::this_thread::get_id();
GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client;
std::unique_lock<std::mutex> lock(mutex);
@ -79,6 +129,7 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C
if (it_group == queue.begin())
{
it_client->start_time = it_client->enqueue_time;
finalize_metrics();
return res;
}
@ -86,6 +137,7 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); } );
it_client->start_time = time(nullptr);
finalize_metrics();
return res;
}
@ -133,6 +185,9 @@ RWLockFIFO::LockHandlerImpl::~LockHandlerImpl()
RWLockFIFO::LockHandlerImpl::LockHandlerImpl(RWLockFIFOPtr && parent, RWLockFIFO::GroupsContainer::iterator it_group,
RWLockFIFO::ClientsContainer::iterator it_client)
: parent{std::move(parent)}, it_group{it_group}, it_client{it_client} {}
: parent{std::move(parent)}, it_group{it_group}, it_client{it_client},
active_client_increment{(it_client->type == RWLockFIFO::Read) ? CurrentMetrics::RWLockActiveReaders
: CurrentMetrics::RWLockActiveWriters}
{}
}

View File

@ -40,6 +40,7 @@ public:
bool isStarted() { return start_time != 0; }
/// TODO: delete extra info below if there is no need fot it already.
std::string info;
int thread_number = 0;
std::time_t enqueue_time = 0;
@ -50,6 +51,7 @@ public:
/// Just use LockHandler::reset() to release the lock
class LockHandlerImpl;
friend class LockHandlerImpl;
using LockHandler = std::shared_ptr<LockHandlerImpl>;
@ -87,28 +89,6 @@ private:
explicit Group(Type type) : type{type} {}
};
public:
class LockHandlerImpl
{
RWLockFIFOPtr parent;
GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client;
ThreadToHandler::iterator it_handler;
LockHandlerImpl(RWLockFIFOPtr && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client);
public:
LockHandlerImpl(const LockHandlerImpl & other) = delete;
~LockHandlerImpl();
friend class RWLockFIFO;
};
private:
mutable std::mutex mutex;
GroupsContainer queue;
ThreadToHandler thread_to_handler;

View File

@ -65,10 +65,10 @@ namespace
/// By these return codes from the child process, we learn (for sure) about errors when creating it.
enum class ReturnCodes : int
{
CANNOT_DUP_STDIN = 42, /// The value is not important, but it is chosen so that it's rare to conflict with the program return code.
CANNOT_DUP_STDOUT = 43,
CANNOT_DUP_STDERR = 44,
CANNOT_EXEC = 45,
CANNOT_DUP_STDIN = 0x55555555, /// The value is not important, but it is chosen so that it's rare to conflict with the program return code.
CANNOT_DUP_STDOUT = 0x55555556,
CANNOT_DUP_STDERR = 0x55555557,
CANNOT_EXEC = 0x55555558,
};
}

View File

@ -20,11 +20,12 @@ public:
*/
Stopwatch(clockid_t clock_type_ = CLOCK_MONOTONIC) : clock_type(clock_type_) { restart(); }
void start() { setStart(); is_running = true; }
void stop() { updateElapsed(); is_running = false; }
void restart() { elapsed_ns = 0; start(); }
UInt64 elapsed() const { updateElapsed(); return elapsed_ns; }
double elapsedSeconds() const { updateElapsed(); return static_cast<double>(elapsed_ns) / 1000000000ULL; }
void start() { setStart(); is_running = true; }
void stop() { updateElapsed(); is_running = false; }
void restart() { elapsed_ns = 0; start(); }
UInt64 elapsed() const { updateElapsed(); return elapsed_ns; }
UInt64 elapsedMilliseconds() const { updateElapsed(); return elapsed_ns / 1000000UL; }
double elapsedSeconds() const { updateElapsed(); return static_cast<double>(elapsed_ns) / 1000000000ULL; }
private:
mutable UInt64 start_ns;

View File

@ -10,10 +10,9 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
extern const int TYPE_MISMATCH;
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
extern const int TYPE_MISMATCH;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
NullableAdapterBlockInputStream::NullableAdapterBlockInputStream(
@ -97,6 +96,9 @@ void NullableAdapterBlockInputStream::buildActions(
{
size_t in_size = in_sample.columns();
if (out_sample.columns() != in_size)
throw Exception("Number of columns in INSERT SELECT doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
actions.reserve(in_size);
rename.reserve(in_size);

View File

@ -27,6 +27,7 @@ private:
String getID() const override { return "Owning(" + stream->getID() + ")"; }
protected:
BlockInputStreamPtr stream;
std::unique_ptr<OwnType> own;
};

View File

@ -1,16 +1,12 @@
#include <thread>
#include <future>
#include <Dictionaries/ExecutableDictionarySource.h>
#include <Common/ShellCommand.h>
#include <Interpreters/Context.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <common/logger_useful.h>
@ -20,6 +16,28 @@ namespace DB
static const size_t max_block_size = 8192;
namespace
{
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
{
public:
ShellCommandOwningBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr<ShellCommand> own)
: OwningBlockInputStream(std::move(stream), std::move(own))
{
}
void readSuffix() override
{
OwningBlockInputStream<ShellCommand>::readSuffix();
own->wait();
}
};
}
ExecutableDictionarySource::ExecutableDictionarySource(const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
@ -47,10 +65,13 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll()
LOG_TRACE(log, "loadAll " + toString());
auto process = ShellCommand::execute(command);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ShellCommand>>(input_stream, std::move(process));
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
}
namespace
{
/** A stream, that also runs and waits for background thread
* (that will feed data into pipe to be read from the other side of the pipe).
*/
@ -86,20 +107,29 @@ private:
void readSuffix() override
{
IProfilingBlockInputStream::readSuffix();
if (!wait_called)
{
wait_called = true;
command->wait();
}
thread.join();
/// To rethrow an exception, if any.
task.get_future().get();
}
String getName() const override { return "WithBackgroundThread"; }
String getID() const override { return "WithBackgroundThread(" + stream->getID() + ")"; }
String getID() const override { return "WithBackgroundThread(" + stream->getID() + ")"; }
BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command;
std::packaged_task<void()> task;
std::thread thread;
bool wait_called = false;
};
}
BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids)
{

View File

@ -31,7 +31,6 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
#if USE_POINT_IN_POLYGON
namespace FunctionPointInPolygonDetail
{
@ -272,18 +271,15 @@ template <>
const char * FunctionPointInPolygon<PointInPolygonFranklin>::name = "pointInPolygonFranklin";
template <>
const char * FunctionPointInPolygon<PointInPolygonWithGrid, true>::name = "pointInPolygon";
#endif
void registerFunctionsGeo(FunctionFactory & factory)
{
factory.registerFunction<FunctionGreatCircleDistance>();
factory.registerFunction<FunctionPointInEllipses>();
#if USE_POINT_IN_POLYGON
factory.registerFunction<FunctionPointInPolygon<PointInPolygonFranklin>>();
factory.registerFunction<FunctionPointInPolygon<PointInPolygonWinding>>();
factory.registerFunction<FunctionPointInPolygon<PointInPolygonCrossing>>();
factory.registerFunction<FunctionPointInPolygon<PointInPolygonWithGrid, true>>();
#endif
}
}

View File

@ -1,502 +0,0 @@
#include <Functions/GatherUtils.h>
namespace DB
{
/// Creates IArraySource from ColumnArray
template <typename ... Types>
struct ArraySourceCreator;
template <typename Type, typename ... Types>
struct ArraySourceCreator<Type, Types ...>
{
static std::unique_ptr<IArraySource>
create(const ColumnArray & col, const ColumnUInt8 * null_map, bool is_const, size_t total_rows)
{
if (typeid_cast<const ColumnVector<Type> *>(&col.getData()))
{
if (null_map)
{
if (is_const)
return std::make_unique<ConstSource<NullableArraySource<NumericArraySource<Type>>>>(col, *null_map, total_rows);
return std::make_unique<NullableArraySource<NumericArraySource<Type>>>(col, *null_map);
}
if (is_const)
return std::make_unique<ConstSource<NumericArraySource<Type>>>(col, total_rows);
return std::make_unique<NumericArraySource<Type>>(col);
}
return ArraySourceCreator<Types...>::create(col, null_map, is_const, total_rows);
}
};
template <>
struct ArraySourceCreator<>
{
static std::unique_ptr<IArraySource>
create(const ColumnArray & col, const ColumnUInt8 * null_map, bool is_const, size_t total_rows)
{
if (null_map)
{
if (is_const)
return std::make_unique<ConstSource<NullableArraySource<GenericArraySource>>>(col, *null_map, total_rows);
return std::make_unique<NullableArraySource<GenericArraySource>>(col, *null_map);
}
if (is_const)
return std::make_unique<ConstSource<GenericArraySource>>(col, total_rows);
return std::make_unique<GenericArraySource>(col);
}
};
std::unique_ptr<IArraySource> createArraySource(const ColumnArray & col, bool is_const, size_t total_rows)
{
using Creator = typename ApplyTypeListForClass<ArraySourceCreator, TypeListNumbers>::Type;
if (auto column_nullable = typeid_cast<const ColumnNullable *>(&col.getData()))
{
ColumnArray column(column_nullable->getNestedColumn(), col.getOffsetsColumn());
return Creator::create(column, &column_nullable->getNullMapConcreteColumn(), is_const, total_rows);
}
return Creator::create(col, nullptr, is_const, total_rows);
}
/// Creates IArraySink from ColumnArray
template <typename ... Types>
struct ArraySinkCreator;
template <typename Type, typename ... Types>
struct ArraySinkCreator<Type, Types ...>
{
static std::unique_ptr<IArraySink> create(ColumnArray & col, ColumnUInt8 * null_map, size_t column_size)
{
if (typeid_cast<ColumnVector<Type> *>(&col.getData()))
{
if (null_map)
return std::make_unique<NullableArraySink<NumericArraySink<Type>>>(col, *null_map, column_size);
return std::make_unique<NumericArraySink<Type>>(col, column_size);
}
return ArraySinkCreator<Types ...>::create(col, null_map, column_size);
}
};
template <>
struct ArraySinkCreator<>
{
static std::unique_ptr<IArraySink> create(ColumnArray & col, ColumnUInt8 * null_map, size_t column_size)
{
if (null_map)
return std::make_unique<NullableArraySink<GenericArraySink>>(col, *null_map, column_size);
return std::make_unique<GenericArraySink>(col, column_size);
}
};
std::unique_ptr<IArraySink> createArraySink(ColumnArray & col, size_t column_size)
{
using Creator = ApplyTypeListForClass<ArraySinkCreator, TypeListNumbers>::Type;
if (auto column_nullable = typeid_cast<ColumnNullable *>(&col.getData()))
{
ColumnArray column(column_nullable->getNestedColumn(), col.getOffsetsColumn());
return Creator::create(column, &column_nullable->getNullMapConcreteColumn(), column_size);
}
return Creator::create(col, nullptr, column_size);
}
/// Base classes which selects template function implementation with concrete ArraySource or ArraySink
/// Derived classes should implement selectImpl for ArraySourceSelector and ArraySinkSelector
/// or selectSourceSink for ArraySinkSourceSelector
template <typename Base, typename ... Types>
struct ArraySourceSelector;
template <typename Base, typename Type, typename ... Types>
struct ArraySourceSelector<Base, Type, Types ...>
{
template <typename ... Args>
static void select(IArraySource & source, Args && ... args)
{
if (auto array = typeid_cast<NumericArraySource<Type> *>(&source))
Base::selectImpl(*array, args ...);
else if (auto nullable_array = typeid_cast<NullableArraySource<NumericArraySource<Type>> *>(&source))
Base::selectImpl(*nullable_array, args ...);
else if (auto const_array = typeid_cast<ConstSource<NumericArraySource<Type>> *>(&source))
Base::selectImpl(*const_array, args ...);
else if (auto const_nullable_array = typeid_cast<ConstSource<NullableArraySource<NumericArraySource<Type>>> *>(&source))
Base::selectImpl(*const_nullable_array, args ...);
else
ArraySourceSelector<Base, Types ...>::select(source, args ...);
}
};
template <typename Base>
struct ArraySourceSelector<Base>
{
template <typename ... Args>
static void select(IArraySource & source, Args && ... args)
{
if (auto array = typeid_cast<GenericArraySource *>(&source))
Base::selectImpl(*array, args ...);
else if (auto nullable_array = typeid_cast<NullableArraySource<GenericArraySource> *>(&source))
Base::selectImpl(*nullable_array, args ...);
else if (auto const_array = typeid_cast<ConstSource<GenericArraySource> *>(&source))
Base::selectImpl(*const_array, args ...);
else if (auto const_nullable_array = typeid_cast<ConstSource<NullableArraySource<GenericArraySource>> *>(&source))
Base::selectImpl(*const_nullable_array, args ...);
else
throw Exception(std::string("Unknown ArraySource type: ") + typeid(source).name(), ErrorCodes::LOGICAL_ERROR);
}
};
template <typename Base>
using GetArraySourceSelector = typename ApplyTypeListForClass<ArraySourceSelector,
typename PrependToTypeList<Base, TypeListNumbers>::Type>::Type;
template <typename Base, typename ... Types>
struct ArraySinkSelector;
template <typename Base, typename Type, typename ... Types>
struct ArraySinkSelector<Base, Type, Types ...>
{
template <typename ... Args>
static void select(IArraySink & sink, Args && ... args)
{
if (auto nullable_numeric_sink = typeid_cast<NullableArraySink<NumericArraySink<Type>> *>(&sink))
Base::selectImpl(*nullable_numeric_sink, args ...);
else if (auto numeric_sink = typeid_cast<NumericArraySink<Type> *>(&sink))
Base::selectImpl(*numeric_sink, args ...);
else
ArraySinkSelector<Base, Types ...>::select(sink, args ...);
}
};
template <typename Base>
struct ArraySinkSelector<Base>
{
template <typename ... Args>
static void select(IArraySink & sink, Args && ... args)
{
if (auto nullable_generic_sink = typeid_cast<NullableArraySink<GenericArraySink> *>(&sink))
Base::selectImpl(*nullable_generic_sink, args ...);
else if (auto generic_sink = typeid_cast<GenericArraySink *>(&sink))
Base::selectImpl(*generic_sink, args ...);
else
throw Exception(std::string("Unknown ArraySink type: ") + typeid(sink).name(), ErrorCodes::LOGICAL_ERROR);
}
};
template <typename Base>
using GetArraySinkSelector = typename ApplyTypeListForClass<ArraySinkSelector,
typename PrependToTypeList<Base, TypeListNumbers>::Type>::Type;
template <typename Base>
struct ArraySinkSourceSelector
{
template <typename ... Args>
static void select(IArraySource & source, IArraySink & sink, Args && ... args)
{
GetArraySinkSelector<Base>::select(sink, source, args ...);
}
template <typename Sink, typename ... Args>
static void selectImpl(Sink && sink, IArraySource & source, Args && ... args)
{
GetArraySourceSelector<Base>::select(source, sink, args ...);
}
template <typename Source, typename Sink, typename ... Args>
static void selectImpl(Source && source, Sink && sink, Args && ... args)
{
Base::selectSourceSink(source, sink, args ...);
}
};
/// Algorithms.
/// Appends slices from source to sink. Offsets for sink should be precalculated as start positions of result arrays.
/// Only for NumericArraySource, because can't insert values in the middle of arbitary column.
/// Used for array concat implementation.
template <typename Source, typename Sink>
static void append(Source && source, Sink && sink)
{
sink.row_num = 0;
while (!source.isEnd())
{
sink.current_offset = sink.offsets[sink.row_num];
writeSlice(source.getWhole(), sink);
sink.next();
source.next();
}
}
struct ArrayAppend : public GetArraySourceSelector<ArrayAppend>
{
template <typename Source, typename Sink>
static void selectImpl(Source && source, Sink && sink)
{
append(source, sink);
}
};
template <typename Sink>
static void append(IArraySource & source, Sink && sink)
{
ArrayAppend::select(source, sink);
}
/// Concat specialization for GenericArraySource. Because can't use append with arbitrary column type.
template <typename SourceType, typename SinkType>
struct ConcatGenericArrayWriteWholeImpl
{
static void writeWhole(GenericArraySource * generic_source, SinkType && sink)
{
auto source = static_cast<SourceType *>(generic_source);
writeSlice(source->getWhole(), sink);
source->next();
}
};
template <typename Sink>
static void NO_INLINE concatGenericArray(const std::vector<std::unique_ptr<IArraySource>> & sources, Sink && sink)
{
std::vector<GenericArraySource *> generic_sources;
std::vector<bool> is_nullable;
std::vector<bool> is_const;
generic_sources.reserve(sources.size());
is_nullable.assign(sources.size(), false);
is_const.assign(sources.size(), false);
for (auto i : ext::range(0, sources.size()))
{
const auto & source = sources[i];
if (auto generic_source = typeid_cast<GenericArraySource *>(source.get()))
generic_sources.push_back(static_cast<GenericArraySource *>(generic_source));
else if (auto const_generic_source = typeid_cast<ConstSource<GenericArraySource> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(const_generic_source));
is_const[i] = true;
}
else if (auto nullable_source = typeid_cast<NullableArraySource<GenericArraySource> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(nullable_source));
is_nullable[i] = true;
}
else if (auto const_nullable_source = typeid_cast<ConstSource<NullableArraySource<GenericArraySource>> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(const_nullable_source));
is_nullable[i] = is_const[i] = true;
}
else
throw Exception(
std::string("GenericArraySource expected for GenericArraySink, got: ") + typeid(source).name(),
ErrorCodes::LOGICAL_ERROR);
}
while (!sink.isEnd())
{
for (auto i : ext::range(0, sources.size()))
{
auto source = generic_sources[i];
if (is_const[i])
{
if (is_nullable[i])
ConcatGenericArrayWriteWholeImpl<ConstSource<NullableArraySource<GenericArraySource>>, Sink>::writeWhole(source, sink);
else
ConcatGenericArrayWriteWholeImpl<ConstSource<GenericArraySource>, Sink>::writeWhole(source, sink);
}
else
{
if (is_nullable[i])
ConcatGenericArrayWriteWholeImpl<NullableArraySource<GenericArraySource>, Sink>::writeWhole(source, sink);
else
ConcatGenericArrayWriteWholeImpl<GenericArraySource, Sink>::writeWhole(source, sink);
}
}
sink.next();
}
}
/// Concat for array sources. Sources must be either all numeric either all generic.
template <typename Sink>
void NO_INLINE concat(const std::vector<std::unique_ptr<IArraySource>> & sources, Sink && sink)
{
size_t elements_to_reserve = 0;
bool is_first = true;
/// Prepare offsets column. Offsets should point to starts of result arrays.
for (const auto & source : sources)
{
elements_to_reserve += source->getSizeForReserve();
const auto & offsets = source->getOffsets();
if (is_first)
{
sink.offsets.resize(source->getColumnSize());
memset(&sink.offsets[0], 0, sink.offsets.size() * sizeof(offsets[0]));
is_first = false;
}
if (source->isConst())
{
for (size_t i : ext::range(1, offsets.size()))
{
sink.offsets[i] += offsets[0];
}
}
else
{
for (size_t i : ext::range(1, offsets.size()))
{
sink.offsets[i] += offsets[i - 1] - (i > 1 ? offsets[i - 2] : 0);
}
}
}
for (auto i : ext::range(1, sink.offsets.size()))
{
sink.offsets[i] += sink.offsets[i - 1];
}
sink.reserve(elements_to_reserve);
for (const auto & source : sources)
{
append(*source, sink);
}
}
struct ArrayConcat : public GetArraySinkSelector<ArrayConcat>
{
using Sources = std::vector<std::unique_ptr<IArraySource>>;
template <typename Sink>
static void selectImpl(Sink && sink, Sources & sources)
{
concat<Sink>(sources, sink);
}
static void selectImpl(GenericArraySink & sink, Sources & sources)
{
concatGenericArray(sources, sink);
}
static void selectImpl(NullableArraySink<GenericArraySink> & sink, Sources & sources)
{
concatGenericArray(sources, sink);
}
static void selectImpl(GenericArraySink && sink, Sources && sources)
{
concatGenericArray(sources, sink);
}
static void selectImpl(NullableArraySink<GenericArraySink> && sink, Sources & sources)
{
concatGenericArray(sources, sink);
}
};
void concat(std::vector<std::unique_ptr<IArraySource>> & sources, IArraySink & sink)
{
return ArrayConcat::select(sink, sources);
}
/// Slice for array sources.
struct SliceFromLeftConstantOffsetUnboundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromLeftConstantOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset)
{
sliceFromLeftConstantOffsetUnbounded(source, sink, offset);
}
};
struct SliceFromLeftConstantOffsetBoundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromLeftConstantOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset, ssize_t & length)
{
sliceFromLeftConstantOffsetBounded(source, sink, offset, length);
}
};
struct SliceFromRightConstantOffsetUnboundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromRightConstantOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset)
{
sliceFromRightConstantOffsetUnbounded(source, sink, offset);
}
};
struct SliceFromRightConstantOffsetBoundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromRightConstantOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset, ssize_t & length)
{
sliceFromRightConstantOffsetBounded(source, sink, offset, length);
}
};
struct SliceDynamicOffsetUnboundedSelectArraySource
: public ArraySinkSourceSelector<SliceDynamicOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, IColumn & offset_column)
{
sliceDynamicOffsetUnbounded(source, sink, offset_column);
}
};
struct SliceDynamicOffsetBoundedSelectArraySource
: public ArraySinkSourceSelector<SliceDynamicOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, IColumn & offset_column, IColumn & length_column)
{
sliceDynamicOffsetBounded(source, sink, offset_column, length_column);
}
};
void sliceFromLeftConstantOffsetUnbounded(IArraySource & src, IArraySink & sink, size_t offset)
{
SliceFromLeftConstantOffsetUnboundedSelectArraySource::select(src, sink, offset);
}
void sliceFromLeftConstantOffsetBounded(IArraySource & src, IArraySink & sink, size_t offset, ssize_t length)
{
SliceFromLeftConstantOffsetBoundedSelectArraySource::select(src, sink, offset, length);
}
void sliceFromRightConstantOffsetUnbounded(IArraySource & src, IArraySink & sink, size_t offset)
{
SliceFromRightConstantOffsetUnboundedSelectArraySource::select(src, sink, offset);
}
void sliceFromRightConstantOffsetBounded(IArraySource & src, IArraySink & sink, size_t offset, ssize_t length)
{
SliceFromRightConstantOffsetBoundedSelectArraySource::select(src, sink, offset, length);
}
void sliceDynamicOffsetUnbounded(IArraySource & src, IArraySink & sink, IColumn & offset_column)
{
SliceDynamicOffsetUnboundedSelectArraySource::select(src, sink, offset_column);
}
void sliceDynamicOffsetBounded(IArraySource & src, IArraySink & sink, IColumn & offset_column, IColumn & length_column)
{
SliceDynamicOffsetBoundedSelectArraySource::select(src, sink, offset_column, length_column);
}
}

View File

@ -0,0 +1,196 @@
#include "GatherUtils.h"
#include "GatherUtils_selectors.h"
namespace DB
{
/// Algorithms.
/// Appends slices from source to sink. Offsets for sink should be precalculated as start positions of result arrays.
/// Only for NumericArraySource, because can't insert values in the middle of arbitary column.
/// Used for array concat implementation.
template <typename Source, typename Sink>
static void append(Source && source, Sink && sink)
{
sink.row_num = 0;
while (!source.isEnd())
{
sink.current_offset = sink.offsets[sink.row_num];
writeSlice(source.getWhole(), sink);
sink.next();
source.next();
}
}
struct ArrayAppend : public GetArraySourceSelector<ArrayAppend>
{
template <typename Source, typename Sink>
static void selectImpl(Source && source, Sink && sink)
{
append(source, sink);
}
};
template <typename Sink>
static void append(IArraySource & source, Sink && sink)
{
ArrayAppend::select(source, sink);
}
/// Concat specialization for GenericArraySource. Because can't use append with arbitrary column type.
template <typename SourceType, typename SinkType>
struct ConcatGenericArrayWriteWholeImpl
{
static void writeWhole(GenericArraySource * generic_source, SinkType && sink)
{
auto source = static_cast<SourceType *>(generic_source);
writeSlice(source->getWhole(), sink);
source->next();
}
};
template <typename Sink>
static void NO_INLINE concatGenericArray(const std::vector<std::unique_ptr<IArraySource>> & sources, Sink && sink)
{
std::vector<GenericArraySource *> generic_sources;
std::vector<bool> is_nullable;
std::vector<bool> is_const;
generic_sources.reserve(sources.size());
is_nullable.assign(sources.size(), false);
is_const.assign(sources.size(), false);
for (auto i : ext::range(0, sources.size()))
{
const auto & source = sources[i];
if (auto generic_source = typeid_cast<GenericArraySource *>(source.get()))
generic_sources.push_back(static_cast<GenericArraySource *>(generic_source));
else if (auto const_generic_source = typeid_cast<ConstSource<GenericArraySource> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(const_generic_source));
is_const[i] = true;
}
else if (auto nullable_source = typeid_cast<NullableArraySource<GenericArraySource> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(nullable_source));
is_nullable[i] = true;
}
else if (auto const_nullable_source = typeid_cast<ConstSource<NullableArraySource<GenericArraySource>> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(const_nullable_source));
is_nullable[i] = is_const[i] = true;
}
else
throw Exception(
std::string("GenericArraySource expected for GenericArraySink, got: ") + typeid(source).name(), ErrorCodes::LOGICAL_ERROR);
}
while (!sink.isEnd())
{
for (auto i : ext::range(0, sources.size()))
{
auto source = generic_sources[i];
if (is_const[i])
{
if (is_nullable[i])
ConcatGenericArrayWriteWholeImpl<ConstSource<NullableArraySource<GenericArraySource>>, Sink>::writeWhole(source, sink);
else
ConcatGenericArrayWriteWholeImpl<ConstSource<GenericArraySource>, Sink>::writeWhole(source, sink);
}
else
{
if (is_nullable[i])
ConcatGenericArrayWriteWholeImpl<NullableArraySource<GenericArraySource>, Sink>::writeWhole(source, sink);
else
ConcatGenericArrayWriteWholeImpl<GenericArraySource, Sink>::writeWhole(source, sink);
}
}
sink.next();
}
}
/// Concat for array sources. Sources must be either all numeric either all generic.
template <typename Sink>
void NO_INLINE concat(const std::vector<std::unique_ptr<IArraySource>> & sources, Sink && sink)
{
size_t elements_to_reserve = 0;
bool is_first = true;
/// Prepare offsets column. Offsets should point to starts of result arrays.
for (const auto & source : sources)
{
elements_to_reserve += source->getSizeForReserve();
const auto & offsets = source->getOffsets();
if (is_first)
{
sink.offsets.resize(source->getColumnSize());
memset(&sink.offsets[0], 0, sink.offsets.size() * sizeof(offsets[0]));
is_first = false;
}
if (source->isConst())
{
for (size_t i : ext::range(1, offsets.size()))
{
sink.offsets[i] += offsets[0];
}
}
else
{
for (size_t i : ext::range(1, offsets.size()))
{
sink.offsets[i] += offsets[i - 1] - (i > 1 ? offsets[i - 2] : 0);
}
}
}
for (auto i : ext::range(1, sink.offsets.size()))
{
sink.offsets[i] += sink.offsets[i - 1];
}
sink.reserve(elements_to_reserve);
for (const auto & source : sources)
{
append(*source, sink);
}
}
struct ArrayConcat : public GetArraySinkSelector<ArrayConcat>
{
using Sources = std::vector<std::unique_ptr<IArraySource>>;
template <typename Sink>
static void selectImpl(Sink && sink, Sources & sources)
{
concat<Sink>(sources, sink);
}
static void selectImpl(GenericArraySink & sink, Sources & sources)
{
concatGenericArray(sources, sink);
}
static void selectImpl(NullableArraySink<GenericArraySink> & sink, Sources & sources)
{
concatGenericArray(sources, sink);
}
static void selectImpl(GenericArraySink && sink, Sources && sources)
{
concatGenericArray(sources, sink);
}
static void selectImpl(NullableArraySink<GenericArraySink> && sink, Sources & sources)
{
concatGenericArray(sources, sink);
}
};
void concat(std::vector<std::unique_ptr<IArraySource>> & sources, IArraySink & sink)
{
return ArrayConcat::select(sink, sources);
}
}

View File

@ -0,0 +1,47 @@
#include "GatherUtils.h"
namespace DB
{
/// Creates IArraySink from ColumnArray
template <typename... Types>
struct ArraySinkCreator;
template <typename Type, typename... Types>
struct ArraySinkCreator<Type, Types...>
{
static std::unique_ptr<IArraySink> create(ColumnArray & col, ColumnUInt8 * null_map, size_t column_size)
{
if (typeid_cast<ColumnVector<Type> *>(&col.getData()))
{
if (null_map)
return std::make_unique<NullableArraySink<NumericArraySink<Type>>>(col, *null_map, column_size);
return std::make_unique<NumericArraySink<Type>>(col, column_size);
}
return ArraySinkCreator<Types...>::create(col, null_map, column_size);
}
};
template <>
struct ArraySinkCreator<>
{
static std::unique_ptr<IArraySink> create(ColumnArray & col, ColumnUInt8 * null_map, size_t column_size)
{
if (null_map)
return std::make_unique<NullableArraySink<GenericArraySink>>(col, *null_map, column_size);
return std::make_unique<GenericArraySink>(col, column_size);
}
};
std::unique_ptr<IArraySink> createArraySink(ColumnArray & col, size_t column_size)
{
using Creator = ApplyTypeListForClass<ArraySinkCreator, TypeListNumbers>::Type;
if (auto column_nullable = typeid_cast<ColumnNullable *>(&col.getData()))
{
ColumnArray column(column_nullable->getNestedColumn(), col.getOffsetsColumn());
return Creator::create(column, &column_nullable->getNullMapConcreteColumn(), column_size);
}
return Creator::create(col, nullptr, column_size);
}
}

View File

@ -0,0 +1,59 @@
#include "GatherUtils.h"
namespace DB
{
/// Creates IArraySource from ColumnArray
template <typename... Types>
struct ArraySourceCreator;
template <typename Type, typename... Types>
struct ArraySourceCreator<Type, Types...>
{
static std::unique_ptr<IArraySource> create(const ColumnArray & col, const ColumnUInt8 * null_map, bool is_const, size_t total_rows)
{
if (typeid_cast<const ColumnVector<Type> *>(&col.getData()))
{
if (null_map)
{
if (is_const)
return std::make_unique<ConstSource<NullableArraySource<NumericArraySource<Type>>>>(col, *null_map, total_rows);
return std::make_unique<NullableArraySource<NumericArraySource<Type>>>(col, *null_map);
}
if (is_const)
return std::make_unique<ConstSource<NumericArraySource<Type>>>(col, total_rows);
return std::make_unique<NumericArraySource<Type>>(col);
}
return ArraySourceCreator<Types...>::create(col, null_map, is_const, total_rows);
}
};
template <>
struct ArraySourceCreator<>
{
static std::unique_ptr<IArraySource> create(const ColumnArray & col, const ColumnUInt8 * null_map, bool is_const, size_t total_rows)
{
if (null_map)
{
if (is_const)
return std::make_unique<ConstSource<NullableArraySource<GenericArraySource>>>(col, *null_map, total_rows);
return std::make_unique<NullableArraySource<GenericArraySource>>(col, *null_map);
}
if (is_const)
return std::make_unique<ConstSource<GenericArraySource>>(col, total_rows);
return std::make_unique<GenericArraySource>(col);
}
};
std::unique_ptr<IArraySource> createArraySource(const ColumnArray & col, bool is_const, size_t total_rows)
{
using Creator = typename ApplyTypeListForClass<ArraySourceCreator, TypeListNumbers>::Type;
if (auto column_nullable = typeid_cast<const ColumnNullable *>(&col.getData()))
{
ColumnArray column(column_nullable->getNestedColumn(), col.getOffsetsColumn());
return Creator::create(column, &column_nullable->getNullMapConcreteColumn(), is_const, total_rows);
}
return Creator::create(col, nullptr, is_const, total_rows);
}
}

View File

@ -0,0 +1,112 @@
#include "GatherUtils.h"
namespace DB
{
/// Base classes which selects template function implementation with concrete ArraySource or ArraySink
/// Derived classes should implement selectImpl for ArraySourceSelector and ArraySinkSelector
/// or selectSourceSink for ArraySinkSourceSelector
template <typename Base, typename ... Types>
struct ArraySourceSelector;
template <typename Base, typename Type, typename ... Types>
struct ArraySourceSelector<Base, Type, Types ...>
{
template <typename ... Args>
static void select(IArraySource & source, Args && ... args)
{
if (auto array = typeid_cast<NumericArraySource<Type> *>(&source))
Base::selectImpl(*array, args ...);
else if (auto nullable_array = typeid_cast<NullableArraySource<NumericArraySource<Type>> *>(&source))
Base::selectImpl(*nullable_array, args ...);
else if (auto const_array = typeid_cast<ConstSource<NumericArraySource<Type>> *>(&source))
Base::selectImpl(*const_array, args ...);
else if (auto const_nullable_array = typeid_cast<ConstSource<NullableArraySource<NumericArraySource<Type>>> *>(&source))
Base::selectImpl(*const_nullable_array, args ...);
else
ArraySourceSelector<Base, Types ...>::select(source, args ...);
}
};
template <typename Base>
struct ArraySourceSelector<Base>
{
template <typename ... Args>
static void select(IArraySource & source, Args && ... args)
{
if (auto array = typeid_cast<GenericArraySource *>(&source))
Base::selectImpl(*array, args ...);
else if (auto nullable_array = typeid_cast<NullableArraySource<GenericArraySource> *>(&source))
Base::selectImpl(*nullable_array, args ...);
else if (auto const_array = typeid_cast<ConstSource<GenericArraySource> *>(&source))
Base::selectImpl(*const_array, args ...);
else if (auto const_nullable_array = typeid_cast<ConstSource<NullableArraySource<GenericArraySource>> *>(&source))
Base::selectImpl(*const_nullable_array, args ...);
else
throw Exception(std::string("Unknown ArraySource type: ") + typeid(source).name(), ErrorCodes::LOGICAL_ERROR);
}
};
template <typename Base>
using GetArraySourceSelector = typename ApplyTypeListForClass<ArraySourceSelector,
typename PrependToTypeList<Base, TypeListNumbers>::Type>::Type;
template <typename Base, typename ... Types>
struct ArraySinkSelector;
template <typename Base, typename Type, typename ... Types>
struct ArraySinkSelector<Base, Type, Types ...>
{
template <typename ... Args>
static void select(IArraySink & sink, Args && ... args)
{
if (auto nullable_numeric_sink = typeid_cast<NullableArraySink<NumericArraySink<Type>> *>(&sink))
Base::selectImpl(*nullable_numeric_sink, args ...);
else if (auto numeric_sink = typeid_cast<NumericArraySink<Type> *>(&sink))
Base::selectImpl(*numeric_sink, args ...);
else
ArraySinkSelector<Base, Types ...>::select(sink, args ...);
}
};
template <typename Base>
struct ArraySinkSelector<Base>
{
template <typename ... Args>
static void select(IArraySink & sink, Args && ... args)
{
if (auto nullable_generic_sink = typeid_cast<NullableArraySink<GenericArraySink> *>(&sink))
Base::selectImpl(*nullable_generic_sink, args ...);
else if (auto generic_sink = typeid_cast<GenericArraySink *>(&sink))
Base::selectImpl(*generic_sink, args ...);
else
throw Exception(std::string("Unknown ArraySink type: ") + typeid(sink).name(), ErrorCodes::LOGICAL_ERROR);
}
};
template <typename Base>
using GetArraySinkSelector = typename ApplyTypeListForClass<ArraySinkSelector,
typename PrependToTypeList<Base, TypeListNumbers>::Type>::Type;
template <typename Base>
struct ArraySinkSourceSelector
{
template <typename ... Args>
static void select(IArraySource & source, IArraySink & sink, Args && ... args)
{
GetArraySinkSelector<Base>::select(sink, source, args ...);
}
template <typename Sink, typename ... Args>
static void selectImpl(Sink && sink, IArraySource & source, Args && ... args)
{
GetArraySourceSelector<Base>::select(source, sink, args ...);
}
template <typename Source, typename Sink, typename ... Args>
static void selectImpl(Source && source, Sink && sink, Args && ... args)
{
Base::selectSourceSink(source, sink, args ...);
}
};
}

View File

@ -0,0 +1,19 @@
#include "GatherUtils.h"
#include "GatherUtils_selectors.h"
namespace DB
{
struct SliceDynamicOffsetBoundedSelectArraySource : public ArraySinkSourceSelector<SliceDynamicOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, IColumn & offset_column, IColumn & length_column)
{
sliceDynamicOffsetBounded(source, sink, offset_column, length_column);
}
};
void sliceDynamicOffsetBounded(IArraySource & src, IArraySink & sink, IColumn & offset_column, IColumn & length_column)
{
SliceDynamicOffsetBoundedSelectArraySource::select(src, sink, offset_column, length_column);
}
}

View File

@ -0,0 +1,20 @@
#include "GatherUtils.h"
#include "GatherUtils_selectors.h"
namespace DB
{
struct SliceDynamicOffsetUnboundedSelectArraySource : public ArraySinkSourceSelector<SliceDynamicOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, IColumn & offset_column)
{
sliceDynamicOffsetUnbounded(source, sink, offset_column);
}
};
void sliceDynamicOffsetUnbounded(IArraySource & src, IArraySink & sink, IColumn & offset_column)
{
SliceDynamicOffsetUnboundedSelectArraySource::select(src, sink, offset_column);
}
}

View File

@ -0,0 +1,20 @@
#include "GatherUtils.h"
#include "GatherUtils_selectors.h"
namespace DB
{
struct SliceFromLeftConstantOffsetBoundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromLeftConstantOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset, ssize_t & length)
{
sliceFromLeftConstantOffsetBounded(source, sink, offset, length);
}
};
void sliceFromLeftConstantOffsetBounded(IArraySource & src, IArraySink & sink, size_t offset, ssize_t length)
{
SliceFromLeftConstantOffsetBoundedSelectArraySource::select(src, sink, offset, length);
}
}

View File

@ -0,0 +1,20 @@
#include "GatherUtils.h"
#include "GatherUtils_selectors.h"
namespace DB
{
struct SliceFromLeftConstantOffsetUnboundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromLeftConstantOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset)
{
sliceFromLeftConstantOffsetUnbounded(source, sink, offset);
}
};
void sliceFromLeftConstantOffsetUnbounded(IArraySource & src, IArraySink & sink, size_t offset)
{
SliceFromLeftConstantOffsetUnboundedSelectArraySource::select(src, sink, offset);
}
}

View File

@ -0,0 +1,20 @@
#include "GatherUtils.h"
#include "GatherUtils_selectors.h"
namespace DB
{
struct SliceFromRightConstantOffsetBoundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromRightConstantOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset, ssize_t & length)
{
sliceFromRightConstantOffsetBounded(source, sink, offset, length);
}
};
void sliceFromRightConstantOffsetBounded(IArraySource & src, IArraySink & sink, size_t offset, ssize_t length)
{
SliceFromRightConstantOffsetBoundedSelectArraySource::select(src, sink, offset, length);
}
}

View File

@ -0,0 +1,20 @@
#include "GatherUtils.h"
#include "GatherUtils_selectors.h"
namespace DB
{
struct SliceFromRightConstantOffsetUnboundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromRightConstantOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset)
{
sliceFromRightConstantOffsetUnbounded(source, sink, offset);
}
};
void sliceFromRightConstantOffsetUnbounded(IArraySource & src, IArraySink & sink, size_t offset)
{
SliceFromRightConstantOffsetUnboundedSelectArraySource::select(src, sink, offset);
}
}

View File

@ -19,11 +19,6 @@
#pragma GCC diagnostic pop
#endif
#if __clang__ && __clang_major__ <= 4
#else
#define USE_POINT_IN_POLYGON 1
#endif
#include <boost/geometry/geometries/point_xy.hpp>
#include <boost/geometry/geometries/polygon.hpp>
#include <boost/geometry/geometries/multi_polygon.hpp>
@ -83,7 +78,6 @@ UInt64 getMultiPolygonAllocatedBytes(const MultiPolygon & multi_polygon)
return size;
}
#if USE_POINT_IN_POLYGON
template <typename CoordinateType = Float32>
class PointInPolygonWithGrid
{
@ -553,7 +547,7 @@ struct CallPointInPolygon<Type, Types ...>
template <typename PointInPolygonImpl>
static ColumnPtr call(const IColumn & x, const IColumn & y, PointInPolygonImpl && impl)
{
using Impl = typename ApplyTypeListForClass<CallPointInPolygon, TypeListNumbers>::Type;
using Impl = typename ApplyTypeListForClass<::DB::GeoUtils::CallPointInPolygon, TypeListNumbers>::Type;
if (auto column = typeid_cast<const ColumnVector<Type> *>(&x))
return Impl::template call<Type>(*column, y, impl);
return CallPointInPolygon<Types ...>::call(x, y, impl);
@ -579,12 +573,10 @@ struct CallPointInPolygon<>
template <typename PointInPolygonImpl>
ColumnPtr pointInPolygon(const IColumn & x, const IColumn & y, PointInPolygonImpl && impl)
{
using Impl = typename ApplyTypeListForClass<CallPointInPolygon, TypeListNumbers>::Type;
using Impl = typename ApplyTypeListForClass<::DB::GeoUtils::CallPointInPolygon, TypeListNumbers>::Type;
return Impl::call(x, y, impl);
}
#endif
/// Total angle (signed) between neighbor vectors in linestring. Zero if linestring.size() < 2.
template <typename Linestring>
float calcLinestringRotation(const Linestring & points)

View File

@ -33,7 +33,7 @@ void CompressedWriteBuffer::nextImpl()
/** The format of compressed block - see CompressedStream.h
*/
switch (method)
switch (compression_settings.method)
{
case CompressionMethod::LZ4:
case CompressionMethod::LZ4HC:
@ -47,7 +47,7 @@ void CompressedWriteBuffer::nextImpl()
compressed_buffer[0] = static_cast<UInt8>(CompressionMethodByte::LZ4);
if (method == CompressionMethod::LZ4)
if (compression_settings.method == CompressionMethod::LZ4)
compressed_size = header_size + LZ4_compress_default(
working_buffer.begin(),
&compressed_buffer[header_size],
@ -59,7 +59,7 @@ void CompressedWriteBuffer::nextImpl()
&compressed_buffer[header_size],
uncompressed_size,
LZ4_COMPRESSBOUND(uncompressed_size),
0);
compression_settings.level);
UInt32 compressed_size_32 = compressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
@ -83,7 +83,7 @@ void CompressedWriteBuffer::nextImpl()
compressed_buffer.size() - header_size,
working_buffer.begin(),
uncompressed_size,
1);
compression_settings.level);
if (ZSTD_isError(res))
throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_COMPRESS);
@ -131,9 +131,9 @@ void CompressedWriteBuffer::nextImpl()
CompressedWriteBuffer::CompressedWriteBuffer(
WriteBuffer & out_,
CompressionMethod method_,
CompressionSettings compression_settings_,
size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), method(method_)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), compression_settings(compression_settings_)
{
}

View File

@ -6,7 +6,7 @@
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressedStream.h>
#include <IO/CompressionSettings.h>
namespace DB
@ -16,7 +16,7 @@ class CompressedWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
private:
WriteBuffer & out;
CompressionMethod method;
CompressionSettings compression_settings;
PODArray<char> compressed_buffer;
@ -25,7 +25,7 @@ private:
public:
CompressedWriteBuffer(
WriteBuffer & out_,
CompressionMethod method_ = CompressionMethod::LZ4,
CompressionSettings compression_settings = CompressionSettings(),
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
/// The amount of compressed data

View File

@ -0,0 +1,52 @@
#include <Interpreters/Settings.h>
#include "CompressionSettings.h"
namespace DB
{
CompressionSettings::CompressionSettings()
{
}
CompressionSettings::CompressionSettings(CompressionMethod method, int level):
method(method),
level(level)
{
}
CompressionSettings::CompressionSettings(CompressionMethod method):
CompressionSettings(method, getDefaultLevel(method))
{
}
CompressionSettings::CompressionSettings(const Settings & settings)
{
method = settings.network_compression_method;
switch (method)
{
case CompressionMethod::ZSTD:
level = settings.network_zstd_compression_level;
break;
default:
level = getDefaultLevel(method);
}
}
int CompressionSettings::getDefaultLevel(CompressionMethod method)
{
switch (method)
{
case CompressionMethod::LZ4:
return -1;
case CompressionMethod::LZ4HC:
return 0;
case CompressionMethod::ZSTD:
return 1;
default:
return -1;
}
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <IO/CompressedStream.h>
namespace DB
{
class Settings;
struct CompressionSettings
{
CompressionMethod method = CompressionMethod::LZ4;
int level;
CompressionSettings();
CompressionSettings(CompressionMethod method);
CompressionSettings(CompressionMethod method, int level);
CompressionSettings(const Settings & settings);
static int getDefaultLevel(CompressionMethod method);
};
}

View File

@ -10,7 +10,7 @@ namespace DB
namespace
{
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::atomic<bool> * is_cancelled)
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::atomic<int> * is_cancelled)
{
/// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
while (bytes > 0 && !from.eof())
@ -55,7 +55,7 @@ void copyData(ReadBuffer & from, WriteBuffer & to)
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<bool> & is_cancelled)
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<int> & is_cancelled)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled);
}
@ -70,7 +70,7 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
copyDataImpl(from, to, true, bytes, nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<bool> & is_cancelled)
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<int> & is_cancelled)
{
copyDataImpl(from, to, true, bytes, &is_cancelled);
}

View File

@ -21,8 +21,8 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes);
/** The same, with the condition to cancel.
*/
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<bool> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<bool> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<int> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<int> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook);

View File

@ -16,13 +16,14 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <DataStreams/FormatFactory.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/ReshardingWorker.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionMethodSelector.h>
#include <Storages/CompressionSettingsSelector.h>
#include <Interpreters/Settings.h>
#include <Interpreters/Users.h>
#include <Interpreters/Quota.h>
@ -42,7 +43,6 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Databases/IDatabase.h>
#include <Common/ConfigProcessor.h>
#include <Common/ZooKeeper/ZooKeeper.h>
@ -126,8 +126,8 @@ struct ContextShared
Macros macros; /// Substitutions extracted from config.
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
std::shared_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression method, depending on the size of the part.
mutable std::unique_ptr<CompressionMethodSelector> compression_method_selector;
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionSettingsSelector> compression_settings_selector;
std::unique_ptr<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
@ -1384,22 +1384,22 @@ PartLog * Context::getPartLog(const String & database, const String & table)
}
CompressionMethod Context::chooseCompressionMethod(size_t part_size, double part_size_ratio) const
CompressionSettings Context::chooseCompressionSettings(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();
if (!shared->compression_method_selector)
if (!shared->compression_settings_selector)
{
constexpr auto config_name = "compression";
auto & config = getConfigRef();
if (config.has(config_name))
shared->compression_method_selector = std::make_unique<CompressionMethodSelector>(config, "compression");
shared->compression_settings_selector = std::make_unique<CompressionSettingsSelector>(config, "compression");
else
shared->compression_method_selector = std::make_unique<CompressionMethodSelector>();
shared->compression_settings_selector = std::make_unique<CompressionSettingsSelector>();
}
return shared->compression_method_selector->choose(part_size, part_size_ratio);
return shared->compression_settings_selector->choose(part_size, part_size_ratio);
}

View File

@ -11,7 +11,7 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/Settings.h>
#include <Interpreters/ClientInfo.h>
#include <IO/CompressedStream.h>
#include <IO/CompressionSettings.h>
namespace Poco
@ -328,8 +328,8 @@ public:
void setMaxTableSizeToDrop(size_t max_size);
void checkTableCanBeDropped(const String & database, const String & table, size_t table_size);
/// Lets you select the compression method according to the conditions described in the configuration file.
CompressionMethod chooseCompressionMethod(size_t part_size, double part_size_ratio) const;
/// Lets you select the compression settings according to the conditions described in the configuration file.
CompressionSettings chooseCompressionSettings(size_t part_size, double part_size_ratio) const;
/// Get the server uptime in seconds.
time_t getUptimeSeconds() const;

View File

@ -1014,7 +1014,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
* For example, in the table there is a column "domain(URL)", and we requested domain(URL).
*/
String function_string = func_node->getColumnName();
NamesAndTypesList::const_iterator it = findColumn(function_string);
auto it = findColumn(function_string);
if (columns.end() != it)
{
ast = std::make_shared<ASTIdentifier>(func_node->range, function_string);
@ -1051,24 +1051,34 @@ void ExpressionAnalyzer::normalizeTreeImpl(
if (identifier_node->kind == ASTIdentifier::Column)
{
/// If it is an alias, but not a parent alias (for constructs like "SELECT column + 1 AS column").
Aliases::const_iterator jt = aliases.find(identifier_node->name);
if (jt != aliases.end() && current_alias != identifier_node->name)
auto it_alias = aliases.find(identifier_node->name);
if (it_alias != aliases.end() && current_alias != identifier_node->name)
{
/// Let's replace it with the corresponding tree node.
if (current_asts.count(jt->second.get()))
if (current_asts.count(it_alias->second.get()))
throw Exception("Cyclic aliases", ErrorCodes::CYCLIC_ALIASES);
if (!my_alias.empty() && my_alias != jt->second->getAliasOrColumnName())
if (!my_alias.empty() && my_alias != it_alias->second->getAliasOrColumnName())
{
/// In a construct like "a AS b", where a is an alias, you must set alias b to the result of substituting alias a.
ast = jt->second->clone();
ast->setAlias(my_alias);
/// Avoid infinite recursion here
auto replace_to_identifier = typeid_cast<ASTIdentifier *>(it_alias->second.get());
bool is_cycle = replace_to_identifier &&
replace_to_identifier->kind == ASTIdentifier::Column &&
replace_to_identifier->name == identifier_node->name;
if (!is_cycle)
{
/// In a construct like "a AS b", where a is an alias, you must set alias b to the result of substituting alias a.
ast = it_alias->second->clone();
ast->setAlias(my_alias);
replaced = true;
}
}
else
{
ast = jt->second;
ast = it_alias->second;
replaced = true;
}
replaced = true;
}
}
}

View File

@ -100,30 +100,11 @@ void InterpreterSelectQuery::init(const BlockInputStreamPtr & input, const Names
}
}
if (is_first_select_inside_union_all && (hasAsterisk() || hasAggregation(query)))
{
basicInit(input);
renameColumns();
if (!required_column_names.empty())
rewriteExpressionList(required_column_names);
// We execute this code here, because otherwise the following kind of query would not work
// SELECT X FROM (SELECT * FROM (SELECT 1 AS X, 2 AS Y) UNION ALL SELECT 3, 4)
// because the asterisk is replaced with columns only when query_analyzer objects are created in basicInit().
renameColumns();
if (!required_column_names.empty() && (table_column_names.size() != required_column_names.size()))
{
rewriteExpressionList(required_column_names);
/// Now there is obsolete information to execute the query. We update this information.
initQueryAnalyzer();
}
}
else
{
renameColumns();
if (!required_column_names.empty())
rewriteExpressionList(required_column_names);
basicInit(input);
}
basicInit(input);
}
bool InterpreterSelectQuery::hasAggregation(const ASTSelectQuery & query_ptr)

View File

@ -6,9 +6,11 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/ActionBlocker.h>
#include <Core/Types.h>
#include <map>
#include <atomic>
#include <utility>
#include <Poco/Net/HTMLForm.h>
namespace Poco { namespace Net { class HTTPServerResponse; } }
@ -67,11 +69,8 @@ public:
virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0;
virtual ~InterserverIOEndpoint() {}
void cancel() { is_cancelled = true; }
protected:
/// You need to stop the data transfer.
std::atomic<bool> is_cancelled {false};
/// You need to stop the data transfer if blocker is activated.
ActionBlocker blocker;
};
using InterserverIOEndpointPtr = std::shared_ptr<InterserverIOEndpoint>;
@ -88,7 +87,7 @@ public:
std::lock_guard<std::mutex> lock(mutex);
if (endpoint_map.count(name))
throw Exception("Duplicate interserver IO endpoint: " + name, ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT);
endpoint_map[name] = endpoint;
endpoint_map[name] = std::move(endpoint);
}
void removeEndpoint(const String & name)
@ -119,7 +118,7 @@ class InterserverIOEndpointHolder
{
public:
InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_)
: name(name_), endpoint(endpoint_), handler(handler_)
: name(name_), endpoint(std::move(endpoint_)), handler(handler_)
{
handler.addEndpoint(name, endpoint);
}
@ -143,7 +142,9 @@ public:
}
}
void cancel() { endpoint->cancel(); }
ActionBlocker & getBlocker() { return endpoint->blocker; }
void cancelForever() { getBlocker().cancelForever(); }
ActionBlocker::BlockHolder cancel() { return getBlocker().cancel(); }
private:
String name;

View File

@ -163,6 +163,9 @@ struct Settings
/** Allows you to select the method of data compression when writing */ \
M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4) \
\
/** Allows you to select the level of ZSTD compression */ \
M(SettingInt64, network_zstd_compression_level, 1) \
\
/** Priority of the query. 1 - the highest, higher value - lower priority; 0 - do not use priorities. */ \
M(SettingUInt64, priority, 0) \
\

View File

@ -308,13 +308,15 @@ User::User(const String & name_, const String & config_elem, Poco::Util::Abstrac
void Users::loadFromConfig(Poco::Util::AbstractConfiguration & config)
{
cont.clear();
Container new_cont;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys("users", config_keys);
for (const std::string & key : config_keys)
cont.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple(key, "users." + key, config));
new_cont.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple(key, "users." + key, config));
cont = std::move(new_cont);
}
const User & Users::get(const String & user_name, const String & password, const Poco::Net::IPAddress & address) const

View File

@ -61,6 +61,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("level", "compression level")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
;
@ -93,6 +94,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else if (use_none)
method = DB::CompressionMethod::NONE;
DB::CompressionSettings settings(method, options.count("level") > 0 ? options["level"].as<int>() : DB::CompressionSettings::getDefaultLevel(method));
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);
@ -110,7 +113,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else
{
/// Compression
DB::CompressedWriteBuffer to(wb, method, block_size);
DB::CompressedWriteBuffer to(wb, settings, block_size);
DB::copyData(rb, to);
}
}

View File

@ -931,7 +931,7 @@ private:
else
throw DB::Exception("Unknown type " + config_exec_type + " in :" + test_name);
times_to_run = test_config->getUInt("times_to_run");
times_to_run = test_config->getUInt("times_to_run", 1);
stop_conditions_by_run.clear();
TestStopConditions stop_conditions_template;

View File

@ -12,6 +12,7 @@
#include <IO/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/CompressionSettings.h>
#include <IO/copyData.h>
@ -663,7 +664,7 @@ void TCPHandler::initBlockOutput()
{
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, query_context.getSettingsRef().network_compression_method);
*out, CompressionSettings(query_context.getSettingsRef()));
else
state.maybe_compressed_out = out;

View File

@ -8,12 +8,15 @@
<count>10</count>
</logger>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<!-- For HTTPS and SSL over native protocol. -->
<!--
<https_port>8443</https_port>
<tcp_ssl_port>9440</tcp_ssl_port>
-->
<!-- Used with https_port and tcp_ssl_port. Full ssl options list: https://github.com/yandex/ClickHouse/blob/master/contrib/libpoco/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h#L71 -->
<openSSL>
<server> <!-- Used for https server AND secure tcp port -->
@ -47,11 +50,6 @@
<http_server_default_response><![CDATA[<html ng-app="SMI2"><head><base href="http://ui.tabix.io/"></head><body><div ui-view="" class="content-ui"></div><script src="http://loader.tabix.io/master.js"></script></body></html>]]></http_server_default_response>
-->
<tcp_port>9000</tcp_port>
<!-- SSL port -->
<tcp_ssl_port>9440</tcp_ssl_port>
<!-- Port for communication between replicas. Used for data exchange. -->
<interserver_http_port>9009</interserver_http_port>
@ -129,7 +127,17 @@
<!-- Configuration of clusters that could be used in Distributed tables.
https://clickhouse.yandex/reference_en.html#Distributed
-->
<remote_servers incl="clickhouse_remote_servers" />
<remote_servers incl="clickhouse_remote_servers" >
<!-- Test only shard config for testing distributed storage -->
<test_shard_localhost>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</test_shard_localhost>
</remote_servers>
<!-- If element has 'incl' attribute, then for it's value will be used corresponding substitution from another file.

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
}
/** Allows you to select the compression method for the conditions specified in the configuration file.
/** Allows you to select the compression settings for the conditions specified in the configuration file.
* The config looks like this
<compression>
@ -29,6 +29,7 @@ namespace ErrorCodes
<! - Which compression method to choose. ->
<method>zstd</method>
<level>2</level>
</case>
<case>
@ -36,23 +37,23 @@ namespace ErrorCodes
</case>
</compression>
*/
class CompressionMethodSelector
class CompressionSettingsSelector
{
private:
struct Element
{
size_t min_part_size = 0;
double min_part_size_ratio = 0;
CompressionMethod method = CompressionMethod::LZ4;
CompressionSettings settings = CompressionSettings(CompressionMethod::LZ4);
void setMethod(const std::string & name)
static CompressionMethod compressionMethodFromString(const std::string & name)
{
if (name == "lz4")
method = CompressionMethod::LZ4;
return CompressionMethod::LZ4;
else if (name == "zstd")
method = CompressionMethod::ZSTD;
return CompressionMethod::ZSTD;
else if (name == "none")
method = CompressionMethod::NONE;
return CompressionMethod::NONE;
else
throw Exception("Unknown compression method " + name, ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
@ -62,7 +63,9 @@ private:
min_part_size = config.getUInt64(config_prefix + ".min_part_size", 0);
min_part_size_ratio = config.getDouble(config_prefix + ".min_part_size_ratio", 0);
setMethod(config.getString(config_prefix + ".method"));
CompressionMethod method = compressionMethodFromString(config.getString(config_prefix + ".method"));
int level = config.getInt64(config_prefix + ".level", CompressionSettings::getDefaultLevel(method));
settings = CompressionSettings(method, level);
}
bool check(size_t part_size, double part_size_ratio) const
@ -75,9 +78,9 @@ private:
std::vector<Element> elements;
public:
CompressionMethodSelector() {} /// Always returns the default method.
CompressionSettingsSelector() {} /// Always returns the default method.
CompressionMethodSelector(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
CompressionSettingsSelector(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
@ -91,13 +94,13 @@ public:
}
}
CompressionMethod choose(size_t part_size, double part_size_ratio) const
CompressionSettings choose(size_t part_size, double part_size_ratio) const
{
CompressionMethod res = CompressionMethod::LZ4;
CompressionSettings res = CompressionSettings(CompressionMethod::LZ4);
for (const auto & element : elements)
if (element.check(part_size, part_size_ratio))
res = element.method;
res = element.settings;
return res;
}

View File

@ -47,7 +47,7 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
if (is_cancelled)
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
String part_name = params.get("part");
@ -120,9 +120,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out, is_cancelled);
copyData(file_in, hashing_out, blocker.getCounter());
if (is_cancelled)
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (hashing_out.count() != size)
@ -181,14 +181,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
return fetchPartImpl(part_name, replica_path, host, port, "", to_detached);
}
MergeTreeData::MutableDataPartPtr Fetcher::fetchShardedPart(
const InterserverIOEndpointLocation & location,
const String & part_name,
size_t shard_no)
{
return fetchPartImpl(part_name, location.name, location.host, location.port, toString(shard_no), true);
}
MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl(
const String & part_name,
const String & replica_path,
@ -241,9 +233,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl(
WriteBufferFromFile file_out(absolute_part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(in, hashing_out, file_size, is_cancelled);
copyData(in, hashing_out, file_size, blocker.getCounter());
if (is_cancelled)
if (blocker.isCancelled())
{
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).

View File

@ -54,14 +54,8 @@ public:
int port,
bool to_detached = false);
/// Method for resharding. Downloads a sharded part
/// from the specified shard to the `to_detached` folder.
MergeTreeData::MutableDataPartPtr fetchShardedPart(
const InterserverIOEndpointLocation & location,
const String & part_name,
size_t shard_no);
void cancel() { is_cancelled = true; }
/// You need to stop the data transfer.
ActionBlocker blocker;
private:
MergeTreeData::MutableDataPartPtr fetchPartImpl(
@ -74,8 +68,6 @@ private:
private:
MergeTreeData & data;
/// You need to stop the data transfer.
std::atomic<bool> is_cancelled {false};
Logger * log;
};

View File

@ -108,9 +108,6 @@ MergeTreeData::MergeTreeData(
parts_clean_callback(parts_clean_callback_ ? parts_clean_callback_ : [this](){ clearOldParts(); }),
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
{
checkNoMultidimensionalArrays(*columns, attach);
checkNoMultidimensionalArrays(materialized_columns, attach);
merging_params.check(*columns);
if (!primary_expr_ast && merging_params.mode != MergingParams::Unsorted)
@ -1126,7 +1123,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
auto compression_method = this->context.chooseCompressionMethod(
auto compression_settings = this->context.chooseCompressionSettings(
part->size_in_bytes,
static_cast<double>(part->size_in_bytes) / this->getTotalActiveSizeInBytes());
ExpressionBlockInputStream in(part_in, expression);
@ -1138,7 +1135,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
* temporary column name ('converting_column_name') created in 'createConvertExpression' method
* will have old name of shared offsets for arrays.
*/
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, compression_method, true /* skip_offsets */);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, compression_settings, true /* skip_offsets */);
in.readPrefix();
out.writePrefix();
@ -1732,6 +1729,7 @@ void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String &
{
std::experimental::optional<String> prefix;
String partition_id;
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
const auto & partition = dynamic_cast<const ASTPartition &>(*partition_ast);
@ -1762,24 +1760,23 @@ void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String &
LOG_DEBUG(log, "Snapshot will be placed at " + backup_path);
/// Acquire a snapshot of active data parts to prevent removing while doing backup.
const auto data_parts = getDataParts();
size_t parts_processed = 0;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
for (const auto & part : data_parts)
{
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(it.name(), &part_info, format_version))
continue;
if (prefix)
{
if (!startsWith(part_info.partition_id, prefix.value()))
if (!startsWith(part->info.partition_id, prefix.value()))
continue;
}
else if (part_info.partition_id != partition_id)
else if (part->info.partition_id != partition_id)
continue;
LOG_DEBUG(log, "Freezing part " << it.name());
LOG_DEBUG(log, "Freezing part " << part->name);
String part_absolute_path = it.path().absolute().toString();
String part_absolute_path = Poco::Path(part->getFullPath()).absolute().toString();
if (!startsWith(part_absolute_path, clickhouse_path))
throw Exception("Part path " + part_absolute_path + " is not inside " + clickhouse_path, ErrorCodes::LOGICAL_ERROR);

View File

@ -241,15 +241,17 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
if (!final && parts.size() == 1)
return false;
MergeTreeData::DataPartsVector::const_iterator it = parts.begin();
MergeTreeData::DataPartsVector::const_iterator prev_it = it;
auto it = parts.begin();
auto prev_it = it;
size_t sum_bytes = 0;
while (it != parts.end())
{
if ((it != parts.begin() || parts.size() == 1) /// For the case of one part, we check that it can be merged "with itself".
&& !can_merge(*prev_it, *it))
/// For the case of one part, we check that it can be merged "with itself".
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it))
{
return false;
}
sum_bytes += (*it)->size_in_bytes;
@ -477,7 +479,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
static const String TMP_PREFIX = "tmp_merge_";
if (isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
const MergeTreeData::DataPartsVector & parts = future_part.parts;
@ -617,12 +619,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
if (deduplicate && merged_stream->isGroupedOutput())
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, Limits(), 0 /*limit_hint*/, Names());
auto compression_method = data.context.chooseCompressionMethod(
auto compression_settings = data.context.chooseCompressionSettings(
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{
data, new_part_tmp_path, merging_columns, compression_method, merged_column_to_size, aio_threshold};
data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, aio_threshold};
merged_stream->readPrefix();
to.writePrefix();
@ -631,7 +633,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
Block block;
while (!isCancelled() && (block = merged_stream->read()))
while (!merges_blocker.isCancelled() && (block = merged_stream->read()))
{
rows_written += block.rows();
to.write(block);
@ -654,7 +656,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
merged_stream->readSuffix();
merged_stream.reset();
if (isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
MergeTreeData::DataPart::Checksums checksums_gathered_columns;
@ -700,7 +702,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_settings, offset_written);
size_t column_elems_written = 0;
column_to.writePrefix();
@ -725,7 +727,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact);
if (isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
}
@ -913,7 +915,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
/// A very rough estimate for the compressed data size of each sharded partition.
/// Actually it all depends on the properties of the expression for sharding.
UInt64 per_shard_size_bytes_compressed = merge_entry->total_size_bytes_compressed / static_cast<double>(job.paths.size());
auto compression_method = data.context.chooseCompressionMethod(
auto compression_settings = data.context.chooseCompressionSettings(
per_shard_size_bytes_compressed,
static_cast<double>(per_shard_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
@ -947,7 +949,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
MergedBlockOutputStreamPtr output_stream;
output_stream = std::make_unique<MergedBlockOutputStream>(
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold);
data, new_part_tmp_path, column_names_and_types, compression_settings, merged_column_to_size, aio_threshold);
per_shard_data_parts.emplace(shard_no, std::move(data_part));
per_shard_output.emplace(shard_no, std::move(output_stream));
@ -1095,7 +1097,7 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP
void MergeTreeDataMerger::abortReshardPartitionIfRequested()
{
if (isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled partition resharding", ErrorCodes::ABORTED);
if (cancellation_hook)

View File

@ -4,6 +4,8 @@
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <atomic>
#include <functional>
#include <Common/ActionBlocker.h>
namespace DB
{
@ -110,42 +112,15 @@ private:
*/
MergeTreeData::DataPartsVector selectAllPartsFromPartition(const String & partition_id);
/** Temporarily cancel merges.
*/
class BlockerImpl
{
public:
BlockerImpl(MergeTreeDataMerger * merger_) : merger(merger_)
{
++merger->cancelled;
}
~BlockerImpl()
{
--merger->cancelled;
}
private:
MergeTreeDataMerger * merger;
};
public:
/** Cancel all merges. All currently running 'mergeParts' methods will throw exception soon.
* All new calls to 'mergeParts' will throw exception till all 'Blocker' objects will be destroyed.
/** Is used to cancel all merges. On cancel() call all currently running 'mergeParts' methods will throw exception soon.
* All new calls to 'mergeParts' will throw exception till all 'BlockHolder' objects will be destroyed.
*/
using Blocker = std::unique_ptr<BlockerImpl>;
Blocker cancel() { return std::make_unique<BlockerImpl>(this); }
/** Cancel all merges forever.
*/
void cancelForever() { ++cancelled; }
bool isCancelled() const { return cancelled > 0; }
public:
ActionBlocker merges_blocker;
enum class MergeAlgorithm
{
Horizontal, /// per-row merge of all columns
Horizontal, /// per-row merge of all columns
Vertical /// per-row merge of PK columns, per-column gather for non-PK columns
};
@ -166,8 +141,6 @@ private:
CancellationHook cancellation_hook;
std::atomic<int> cancelled {0};
void abortReshardPartitionIfRequested();
};

View File

@ -202,7 +202,7 @@ void MergeTreeDataPartChecksums::write(WriteBuffer & to) const
{
writeString("checksums format version: 4\n", to);
CompressedWriteBuffer out{to, CompressionMethod::LZ4, 1 << 16};
CompressedWriteBuffer out{to, CompressionSettings(CompressionMethod::LZ4), 1 << 16};
writeVarUInt(files.size(), out);
for (const auto & it : files)

View File

@ -196,10 +196,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_method = data.context.chooseCompressionMethod(0, 0);
auto compression_settings = data.context.chooseCompressionSettings(0, 0);
NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames());
MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_method);
MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_settings);
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);

View File

@ -31,13 +31,13 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionMethod compression_method_,
CompressionSettings compression_settings_,
size_t aio_threshold_)
: storage(storage_),
min_compress_block_size(min_compress_block_size_),
max_compress_block_size(max_compress_block_size_),
aio_threshold(aio_threshold_),
compression_method(compression_method_)
compression_settings(compression_settings_)
{
}
@ -69,7 +69,7 @@ void IMergedBlockOutputStream::addStream(
path + escaped_column_name, NULL_MAP_EXTENSION,
path + escaped_column_name, NULL_MARKS_FILE_EXTENSION,
max_compress_block_size,
compression_method,
compression_settings,
estimated_size,
aio_threshold);
@ -91,7 +91,7 @@ void IMergedBlockOutputStream::addStream(
path + escaped_size_name, DATA_FILE_EXTENSION,
path + escaped_size_name, MARKS_FILE_EXTENSION,
max_compress_block_size,
compression_method,
compression_settings,
estimated_size,
aio_threshold);
}
@ -105,7 +105,7 @@ void IMergedBlockOutputStream::addStream(
path + escaped_column_name, DATA_FILE_EXTENSION,
path + escaped_column_name, MARKS_FILE_EXTENSION,
max_compress_block_size,
compression_method,
compression_settings,
estimated_size,
aio_threshold);
}
@ -114,23 +114,23 @@ void IMergedBlockOutputStream::addStream(
void IMergedBlockOutputStream::writeData(
const String & name,
const IDataType & type,
const IColumn & column,
const DataTypePtr & type,
const ColumnPtr & column,
OffsetColumns & offset_columns,
size_t level,
bool skip_offsets)
{
writeDataImpl(name, type, column, offset_columns, level, false, skip_offsets);
writeDataImpl(name, type, column, nullptr, offset_columns, level, skip_offsets);
}
void IMergedBlockOutputStream::writeDataImpl(
const String & name,
const IDataType & type,
const IColumn & column,
const DataTypePtr & type,
const ColumnPtr & column,
const ColumnPtr & offsets,
OffsetColumns & offset_columns,
size_t level,
bool write_array_data,
bool skip_offsets)
{
/// NOTE: the parameter write_array_data indicates whether we call this method
@ -138,131 +138,124 @@ void IMergedBlockOutputStream::writeDataImpl(
/// serialization of arrays for the MergeTree engine slightly differs from
/// what the other engines do.
size_t size = column.size();
const DataTypeArray * type_arr = nullptr;
if (type.isNullable())
if (type->isNullable())
{
/// First write to the null map.
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType());
const auto & nullable_type = static_cast<const DataTypeNullable &>(*type);
const auto & nested_type = nullable_type.getNestedType();
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(column);
const IColumn & nested_col = *(nullable_col.getNestedColumn());
const auto & nullable_col = static_cast<const ColumnNullable &>(*column);
const auto & nested_col = nullable_col.getNestedColumn();
std::string filename = name + NULL_MAP_EXTENSION;
ColumnStream & stream = *column_streams[filename];
auto null_map_type = std::make_shared<DataTypeUInt8>();
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
limit = storage.index_granularity;
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
DataTypeUInt8{}.serializeBinaryBulk(nullable_col.getNullMapConcreteColumn(), stream.compressed, prev_mark, limit);
/// This way that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd();
prev_mark += limit;
}
writeColumn(nullable_col.getNullMapColumn(), null_map_type, stream, offsets);
/// Then write data.
writeDataImpl(name, nested_type, nested_col, offset_columns, level, write_array_data, false);
writeDataImpl(name, nested_type, nested_col, offsets, offset_columns, level, skip_offsets);
}
else if (!write_array_data && ((type_arr = typeid_cast<const DataTypeArray *>(&type)) != nullptr))
else if (auto type_arr = typeid_cast<const DataTypeArray *>(type.get()))
{
/// For arrays, you first need to serialize dimensions, and then values.
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
const auto & column_array = typeid_cast<const ColumnArray &>(*column);
ColumnPtr next_level_offsets;
ColumnPtr lengths_column;
auto offsets_data_type = std::make_shared<DataTypeNumber<ColumnArray::Offset_t>>();
if (offsets)
{
/// Have offsets from prev level. Calculate offsets for next level.
next_level_offsets = offsets->clone();
const auto & array_offsets = column_array.getOffsets();
auto & next_level_offsets_column = typeid_cast<ColumnArray::ColumnOffsets_t &>(*next_level_offsets);
auto & next_level_offsets_data = next_level_offsets_column.getData();
for (auto & offset : next_level_offsets_data)
offset = offset ? array_offsets[offset - 1] : 0;
/// Calculate lengths of arrays and write them as a new array.
lengths_column = column_array.getLengthsColumn();
}
if (!skip_offsets && offset_columns.count(size_name) == 0)
{
offset_columns.insert(size_name);
ColumnStream & stream = *column_streams[size_name];
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// If there is `index_offset`, the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
limit = storage.index_granularity;
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit);
/// This way that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd();
prev_mark += limit;
}
if (offsets)
writeColumn(lengths_column, offsets_data_type, stream, offsets);
else
writeColumn(column, type, stream, nullptr);
}
if (type_arr->getNestedType()->isNullable())
writeDataImpl(name, *type_arr->getNestedType(),
typeid_cast<const ColumnArray &>(column).getData(), offset_columns,
level + 1, true, false);
else
writeDataImpl(name, type, column, offset_columns, level + 1, true, false);
writeDataImpl(name, type_arr->getNestedType(), column_array.getDataPtr(),
offsets ? next_level_offsets : column_array.getOffsetsColumn(),
offset_columns, level + 1, skip_offsets);
}
else
{
ColumnStream & stream = *column_streams[name];
writeColumn(column, type, stream, offsets);
}
}
size_t prev_mark = 0;
while (prev_mark < size)
void IMergedBlockOutputStream::writeColumn(
const ColumnPtr & column,
const DataTypePtr & type,
IMergedBlockOutputStream::ColumnStream & stream,
ColumnPtr offsets)
{
std::shared_ptr<DataTypeArray> array_type_holder;
DataTypeArray * array_type;
ColumnPtr array_column;
if (offsets)
{
array_type_holder = std::make_shared<DataTypeArray>(type);
array_type = array_type_holder.get();
array_column = std::make_shared<ColumnArray>(column, offsets);
}
else
array_type = typeid_cast<DataTypeArray *>(type.get());
size_t size = offsets ? offsets->size() : column->size();
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
size_t limit = 0;
limit = storage.index_granularity;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
limit = storage.index_granularity;
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type.serializeBinaryBulk(column, stream.compressed, prev_mark, limit);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd();
prev_mark += limit;
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
if (offsets)
array_type->serializeBinaryBulk(*array_column, stream.compressed, prev_mark, limit);
else if (array_type)
array_type->serializeOffsets(*column, stream.compressed, prev_mark, limit);
else
type->serializeBinaryBulk(*column, stream.compressed, prev_mark, limit);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd();
prev_mark += limit;
}
}
@ -276,14 +269,14 @@ IMergedBlockOutputStream::ColumnStream::ColumnStream(
const std::string & marks_path,
const std::string & marks_file_extension_,
size_t max_compress_block_size,
CompressionMethod compression_method,
CompressionSettings compression_settings,
size_t estimated_size,
size_t aio_threshold) :
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(createWriteBufferFromFileBase(data_path + data_file_extension, estimated_size, aio_threshold, max_compress_block_size)),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_method), compressed(compressed_buf),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_settings), compressed(compressed_buf),
marks_file(marks_path + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file)
{
}
@ -322,10 +315,10 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method)
CompressionSettings compression_settings)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method,
storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io),
columns_list(columns_list_), part_path(part_path_)
{
@ -338,12 +331,12 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method,
CompressionSettings compression_settings,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method,
storage_.context.getSettings().max_compress_block_size, compression_settings,
aio_threshold_),
columns_list(columns_list_), part_path(part_path_)
{
@ -514,18 +507,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto primary_column_it = primary_columns_name_to_position.find(it.name);
if (primary_columns_name_to_position.end() != primary_column_it)
{
writeData(column.name, *column.type, *primary_columns[primary_column_it->second].column, offset_columns, 0, false);
writeData(column.name, column.type, primary_columns[primary_column_it->second].column, offset_columns, 0, false);
}
else
{
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
ColumnPtr permutted_column = column.column->permute(*permutation, 0);
writeData(column.name, *column.type, *permutted_column, offset_columns, 0, false);
writeData(column.name, column.type, permutted_column, offset_columns, 0, false);
}
}
else
{
writeData(column.name, *column.type, *column.column, offset_columns, 0, false);
writeData(column.name, column.type, column.column, offset_columns, 0, false);
}
}
@ -563,10 +556,10 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// Implementation of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method, bool skip_offsets_)
MergeTreeData & storage_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method,
storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io),
part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_)
{
@ -591,7 +584,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, offset_columns, 0, skip_offsets);
writeData(column.name, column.type, column.column, offset_columns, 0, skip_offsets);
}
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;

View File

@ -20,7 +20,7 @@ public:
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionMethod compression_method_,
CompressionSettings compression_settings_,
size_t aio_threshold_);
protected:
@ -35,7 +35,7 @@ protected:
const std::string & marks_path,
const std::string & marks_file_extension_,
size_t max_compress_block_size,
CompressionMethod compression_method,
CompressionSettings compression_settings,
size_t estimated_size,
size_t aio_threshold);
@ -66,8 +66,8 @@ protected:
size_t level, const String & filename, bool skip_offsets);
/// Write data of one column.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns,
size_t level, bool skip_offsets);
void writeData(const String & name, const DataTypePtr & type, const ColumnPtr & column,
OffsetColumns & offset_columns, size_t level, bool skip_offsets);
MergeTreeData & storage;
@ -81,12 +81,15 @@ protected:
size_t aio_threshold;
CompressionMethod compression_method;
CompressionSettings compression_settings;
private:
/// Internal version of writeData.
void writeDataImpl(const String & name, const IDataType & type, const IColumn & column,
OffsetColumns & offset_columns, size_t level, bool write_array_data, bool skip_offsets);
void writeDataImpl(const String & name, const DataTypePtr & type, const ColumnPtr & column,
const ColumnPtr & offsets, OffsetColumns & offset_columns, size_t level, bool skip_offsets);
/// Writes column data into stream.
/// If type is Array, writes offsets only. To write array data, unpack array column and use offsets argument.
void writeColumn(const ColumnPtr & column, const DataTypePtr & type, ColumnStream & stream, ColumnPtr offsets);
};
@ -100,13 +103,13 @@ public:
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method);
CompressionSettings compression_settings);
MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method,
CompressionSettings compression_settings,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_);
@ -155,7 +158,7 @@ class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream
{
public:
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method, bool skip_offsets_);
MergeTreeData & storage_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_);
void write(const Block & block) override;
void writeSuffix() override;

View File

@ -39,7 +39,7 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
if (is_cancelled)
if (blocker.isCancelled())
throw Exception{"RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED};
size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(context.getPath());

View File

@ -39,7 +39,7 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
if (is_cancelled)
if (blocker.isCancelled())
throw Exception{"RemoteQueryExecutor service terminated", ErrorCodes::ABORTED};
std::string query = params.get("query");

View File

@ -67,10 +67,10 @@ void ReplicatedMergeTreeAlterThread::run()
{
/// If you need to lock table structure, then suspend merges.
MergeTreeDataMerger::Blocker merge_blocker;
ActionBlocker::BlockHolder merge_blocker;
if (changed_version || force_recheck_parts)
merge_blocker = storage.merger.cancel();
merge_blocker = storage.merger.merges_blocker.cancel();
MergeTreeData::DataParts parts;
@ -80,6 +80,14 @@ void ReplicatedMergeTreeAlterThread::run()
/// Temporarily cancel part checks to avoid locking for long time.
auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop();
/// Temporarily cancel parts sending
ActionBlocker::BlockHolder data_parts_exchange_blocker;
if (storage.data_parts_exchange_endpoint_holder)
data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel();
/// Temporarily cancel part fetches
auto fetches_blocker = storage.fetcher.blocker.cancel();
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__);

View File

@ -221,8 +221,11 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeperPt
}
auto not_cached_blocks = stat.numChildren - cached_block_stats->size();
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
<< " to clear old ones from ZooKeeper. This might take several minutes.");
if (not_cached_blocks)
{
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
<< " to clear old ones from ZooKeeper. This might take several minutes.");
}
std::vector<std::pair<String, zkutil::ZooKeeper::ExistsFuture>> exists_futures;
for (const String & block : blocks)

View File

@ -594,7 +594,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part->size_in_bytes;
}
if (merger.isCancelled())
if (merger.merges_blocker.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges are cancelled now.";
LOG_DEBUG(log, reason);

View File

@ -160,22 +160,22 @@ void ReplicatedMergeTreeRestartingThread::run()
try
{
storage.endpoint_holder->cancel();
storage.endpoint_holder = nullptr;
storage.data_parts_exchange_endpoint_holder->cancelForever();
storage.data_parts_exchange_endpoint_holder = nullptr;
storage.disk_space_monitor_endpoint_holder->cancel();
storage.disk_space_monitor_endpoint_holder->cancelForever();
storage.disk_space_monitor_endpoint_holder = nullptr;
storage.sharded_partition_uploader_endpoint_holder->cancel();
storage.sharded_partition_uploader_endpoint_holder->cancelForever();
storage.sharded_partition_uploader_endpoint_holder = nullptr;
storage.remote_query_executor_endpoint_holder->cancel();
storage.remote_query_executor_endpoint_holder->cancelForever();
storage.remote_query_executor_endpoint_holder = nullptr;
storage.remote_part_checker_endpoint_holder->cancel();
storage.remote_part_checker_endpoint_holder->cancelForever();
storage.remote_part_checker_endpoint_holder = nullptr;
storage.merger.cancelForever();
storage.merger.merges_blocker.cancelForever();
partialShutdown();

View File

@ -82,9 +82,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
WriteBufferFromFile file_out{absolute_part_path + file_name};
HashingWriteBuffer hashing_out{file_out};
copyData(body, hashing_out, file_size, is_cancelled);
copyData(body, hashing_out, file_size, blocker.getCounter());
if (is_cancelled)
if (blocker.isCancelled())
{
part_file.remove(true);
throw Exception{"Fetching of part was cancelled", ErrorCodes::ABORTED};
@ -96,8 +96,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
if (expected_hash != hashing_out.getHash())
throw Exception{"Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path};
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
if (file_name != "checksums.txt" && file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}

View File

@ -168,7 +168,7 @@ private:
{
Stream(const std::string & data_path, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, CompressionMethod::LZ4, max_compress_block_size)
compressed(plain, CompressionSettings(CompressionMethod::LZ4), max_compress_block_size)
{
plain_offset = Poco::File(data_path).getSize();
}

View File

@ -91,7 +91,7 @@ void StorageMergeTree::shutdown()
if (shutdown_called)
return;
shutdown_called = true;
merger.cancelForever();
merger.merges_blocker.cancelForever();
if (merge_task_handle)
background_pool.removeTask(merge_task_handle);
}
@ -151,7 +151,7 @@ void StorageMergeTree::alter(
const Context & context)
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger.cancel();
auto merge_blocker = merger.merges_blocker.cancel();
auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__);
@ -337,7 +337,7 @@ bool StorageMergeTree::merge(
Stopwatch stopwatch;
auto new_part = merger.mergePartsToTemporaryPart(
future_part, *merge_entry_ptr, aio_threshold, time(0), merging_tagger->reserved_space.get(), deduplicate);
future_part, *merge_entry_ptr, aio_threshold, time(nullptr), merging_tagger->reserved_space.get(), deduplicate);
merger.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
@ -402,7 +402,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.cancel();
auto merge_blocker = merger.merges_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__);
@ -462,7 +462,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partit
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.cancel();
auto merge_blocker = merger.merges_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockForAlter(__PRETTY_FUNCTION__);

View File

@ -335,7 +335,7 @@ StoragePtr StorageReplicatedMergeTree::create(
{
{
InterserverIOEndpointPtr endpoint = std::make_shared<DataPartsExchange::Service>(res->data, res_ptr);
res->endpoint_holder = get_endpoint_holder(endpoint);
res->data_parts_exchange_endpoint_holder = get_endpoint_holder(endpoint);
}
/// Services for resharding.
@ -1887,10 +1887,10 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
{
all_in_zk = false;
if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(0))
if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
{
LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)"
<< " with age " << (time(0) - part->modification_time)
<< " with age " << (time(nullptr) - part->modification_time)
<< " seconds exists locally but not in ZooKeeper."
<< " Won't do merge with that part and will check it.");
enqueuePartForCheck(part->name);
@ -1900,7 +1900,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
if (!all_in_zk)
return false;
LogEntry entry;
ReplicatedMergeTreeLogEntryData entry;
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
@ -1975,6 +1975,13 @@ void StorageReplicatedMergeTree::becomeLeader()
if (shutdown_called)
return;
if (merge_selecting_thread.joinable())
{
LOG_INFO(log, "Deleting old leader");
is_leader_node = false; /// exit trigger inside thread
merge_selecting_thread.join();
}
LOG_INFO(log, "Became leader");
is_leader_node = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
@ -2275,7 +2282,7 @@ void StorageReplicatedMergeTree::shutdown()
* Because restarting_thread will wait for finishing of tasks in background pool,
* and parts are fetched in that tasks.
*/
fetcher.cancel();
fetcher.blocker.cancelForever();
if (restarting_thread)
{
@ -2283,36 +2290,36 @@ void StorageReplicatedMergeTree::shutdown()
restarting_thread.reset();
}
if (endpoint_holder)
if (data_parts_exchange_endpoint_holder)
{
endpoint_holder->cancel();
endpoint_holder = nullptr;
data_parts_exchange_endpoint_holder->cancelForever();
data_parts_exchange_endpoint_holder = nullptr;
}
if (disk_space_monitor_endpoint_holder)
{
disk_space_monitor_endpoint_holder->cancel();
disk_space_monitor_endpoint_holder->cancelForever();
disk_space_monitor_endpoint_holder = nullptr;
}
disk_space_monitor_client.cancel();
if (sharded_partition_uploader_endpoint_holder)
{
sharded_partition_uploader_endpoint_holder->cancel();
sharded_partition_uploader_endpoint_holder->cancelForever();
sharded_partition_uploader_endpoint_holder = nullptr;
}
sharded_partition_uploader_client.cancel();
if (remote_query_executor_endpoint_holder)
{
remote_query_executor_endpoint_holder->cancel();
remote_query_executor_endpoint_holder->cancelForever();
remote_query_executor_endpoint_holder = nullptr;
}
remote_query_executor_client.cancel();
if (remote_part_checker_endpoint_holder)
{
remote_part_checker_endpoint_holder->cancel();
remote_part_checker_endpoint_holder->cancelForever();
remote_part_checker_endpoint_holder = nullptr;
}
}
@ -2442,7 +2449,10 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
}
if (!selected)
{
LOG_INFO(log, "Cannot select parts for optimization");
return false;
}
if (!createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
return false;

View File

@ -258,7 +258,7 @@ private:
bool is_leader_node = false;
std::mutex leader_node_mutex;
InterserverIOEndpointHolderPtr endpoint_holder;
InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder;
InterserverIOEndpointHolderPtr disk_space_monitor_endpoint_holder;
InterserverIOEndpointHolderPtr sharded_partition_uploader_endpoint_holder;
InterserverIOEndpointHolderPtr remote_query_executor_endpoint_holder;

View File

@ -117,7 +117,7 @@ public:
explicit StripeLogBlockOutputStream(StorageStripeLog & storage_)
: storage(storage_), lock(storage.rwlock),
data_out_compressed(storage.full_path() + "data.bin", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
data_out(data_out_compressed, CompressionMethod::LZ4, storage.max_compress_block_size),
data_out(data_out_compressed, CompressionSettings(CompressionMethod::LZ4), storage.max_compress_block_size),
index_out_compressed(storage.full_path() + "index.mrk", INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
index_out(index_out_compressed),
block_out(data_out, 0, &index_out, Poco::File(storage.full_path() + "data.bin").getSize())

View File

@ -126,7 +126,7 @@ private:
{
Stream(const std::string & data_path, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, CompressionMethod::LZ4, max_compress_block_size)
compressed(plain, CompressionSettings(CompressionMethod::LZ4), max_compress_block_size)
{
}

View File

@ -22,10 +22,10 @@ StorageSystemTables::StorageSystemTables(const std::string & name_)
: name(name_),
columns
{
{"database", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"metadata_modification_time", std::make_shared<DataTypeDateTime>()}
{"database", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"metadata_modification_time", std::make_shared<DataTypeDateTime>()}
}
{
}

View File

@ -90,10 +90,9 @@ def main(args):
failures_total = 0
# Keep same default values as in queries/0_stateless/00000_sh_lib.sh
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
os.environ.setdefault("CLICKHOUSE_BINARY", "clickhouse")
os.environ.setdefault("CLICKHOUSE_URL", "http://localhost:8123/")
os.environ.setdefault("CLICKHOUSE_CONFIG", "/etc/clickhouse-server/config.xml")
for suite in sorted(os.listdir(base_dir)):
if SERVER_DIED:
@ -266,7 +265,8 @@ def main(args):
if __name__ == '__main__':
parser = ArgumentParser(description = 'ClickHouse functional tests')
parser.add_argument('-q', '--queries', default = 'queries', help = 'Path to queries dir')
parser.add_argument('-c', '--client', default = 'clickhouse-client', help = 'Client program')
parser.add_argument('-b', '--binary', default = 'clickhouse', help = 'Main clickhouse binary')
parser.add_argument('-c', '--client', help = 'Client program')
parser.add_argument('-o', '--output', help = 'Output xUnit compliant test report directory')
parser.add_argument('-t', '--timeout', type = int, default = 600, help = 'Timeout for each test case in seconds')
parser.add_argument('test', nargs = '?', help = 'Optional test case name regex')
@ -281,14 +281,6 @@ if __name__ == '__main__':
group.add_argument('--no-shard', action = 'store_false', default = None, dest = 'shard', help = 'Do not run shard related tests')
args = parser.parse_args()
if args.client is None:
args.client = args.binary + ' client'
main(args)
#
# TODO:
# Pass variables to tests. currently used values:
# /etc/clickhouse-server/config.xml
# clickhouse (path to binary)
# clickhouse-client (path to binary)
# http://localhost:8123/
#

View File

@ -1,2 +1,5 @@
1
1
0 0
0 0 0
0 0 0

View File

@ -3,3 +3,7 @@ CREATE TABLE test.nested (n Nested(x UInt8)) ENGINE = Memory;
INSERT INTO test.nested VALUES ([1, 2]);
SELECT 1 AS x FROM remote('127.0.0.1', test.nested) ARRAY JOIN n.x;
DROP TABLE test.nested;
SELECT dummy AS dummy, dummy AS b FROM system.one;
SELECT dummy AS dummy, dummy AS b, b AS c FROM system.one;
SELECT b AS c, dummy AS b, dummy AS dummy FROM system.one;

View File

@ -1,10 +1,11 @@
#!/usr/bin/env bash
CLICKHOUSE_URL=${CLICKHOUSE_URL:=http://localhost:8123/}
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
( curl -s --head "${CLICKHOUSE_URL}?query=SELECT%201";
curl -s --head "${CLICKHOUSE_URL}?query=select+*+from+system.numbers+limit+1000000" ) | grep -v "Date:"
( ${CLICKHOUSE_CURL} -s --head "${CLICKHOUSE_URL}?query=SELECT%201";
${CLICKHOUSE_CURL} -s --head "${CLICKHOUSE_URL}?query=select+*+from+system.numbers+limit+1000000" ) | grep -v "Date:"
if [[ `curl -sS -X POST -I "${CLICKHOUSE_URL}?query=SELECT+1" | grep -c '411 Length Required'` -ne 1 ]]; then
if [[ `${CLICKHOUSE_CURL} -sS -X POST -I "${CLICKHOUSE_URL}?query=SELECT+1" | grep -c '411 Length Required'` -ne 1 ]]; then
echo FAIL
fi

View File

@ -0,0 +1 @@
server still alive

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
# https://github.com/yandex/ClickHouse/issues/1300
clickhouse-client -q "DROP TABLE IF EXISTS test.advertiser";
clickhouse-client -q "DROP TABLE IF EXISTS test.advertiser_test";
clickhouse-client -q "CREATE TABLE test.advertiser ( action_date Date, adblock UInt8, imps Int64 ) Engine = SummingMergeTree( action_date, ( adblock ), 8192, ( imps ) )";
clickhouse-client -q "CREATE TABLE test.advertiser_test ( action_date Date, adblock UInt8, imps Int64, Hash UInt64 ) Engine = SummingMergeTree( action_date, ( adblock, Hash ), 8192, ( imps ) )";
# This test will fail. It's ok.
clickhouse-client -q "INSERT INTO test.advertiser_test SELECT *, sipHash64( CAST(adblock AS String) ), CAST(1 AS Int8) FROM test.advertiser;" 2>/dev/null
clickhouse-client -q "DROP TABLE test.advertiser";
clickhouse-client -q "DROP TABLE test.advertiser_test";
clickhouse-client -q "SELECT 'server still alive'";

View File

@ -0,0 +1,60 @@
2017-10-02 [0,42]
2017-10-02 [1,42]
2017-10-02 [2,42]
2017-10-02 [3,42]
2017-10-02 [4,42]
2017-10-02 [5,42]
2017-10-02 [6,42]
2017-10-02 [7,42]
2017-10-02 [8,42]
2017-10-02 [9,42]
2017-10-02 \N
2017-10-02 1
2017-10-02 \N
2017-10-02 3
2017-10-02 \N
2017-10-02 5
2017-10-02 \N
2017-10-02 7
2017-10-02 \N
2017-10-02 9
2017-10-02 [NULL,0,NULL]
2017-10-02 [1,1,NULL]
2017-10-02 [NULL,2,NULL]
2017-10-02 [3,3,NULL]
2017-10-02 [NULL,4,NULL]
2017-10-02 [5,5,NULL]
2017-10-02 [NULL,6,NULL]
2017-10-02 [7,7,NULL]
2017-10-02 [NULL,8,NULL]
2017-10-02 [9,9,NULL]
2017-10-02 [[0],[1,2]]
2017-10-02 [[1],[2,3]]
2017-10-02 [[2],[3,4]]
2017-10-02 [[3],[4,5]]
2017-10-02 [[4],[5,6]]
2017-10-02 [[5],[6,7]]
2017-10-02 [[6],[7,8]]
2017-10-02 [[7],[8,9]]
2017-10-02 [[8],[9,10]]
2017-10-02 [[9],[10,11]]
2017-10-02 [[1,NULL,0],[3,NULL,0]]
2017-10-02 [[1,NULL,1],[3,NULL,1]]
2017-10-02 [[1,NULL,2],[3,NULL,2]]
2017-10-02 [[1,NULL,3],[3,NULL,3]]
2017-10-02 [[1,NULL,4],[3,NULL,4]]
2017-10-02 [[1,NULL,5],[3,NULL,5]]
2017-10-02 [[1,NULL,6],[3,NULL,6]]
2017-10-02 [[1,NULL,7],[3,NULL,7]]
2017-10-02 [[1,NULL,8],[3,NULL,8]]
2017-10-02 [[1,NULL,9],[3,NULL,9]]
2017-10-02 [[[0]],[[1],[2,3]]]
2017-10-02 [[[1]],[[2],[3,4]]]
2017-10-02 [[[2]],[[3],[4,5]]]
2017-10-02 [[[3]],[[4],[5,6]]]
2017-10-02 [[[4]],[[5],[6,7]]]
2017-10-02 [[[5]],[[6],[7,8]]]
2017-10-02 [[[6]],[[7],[8,9]]]
2017-10-02 [[[7]],[[8],[9,10]]]
2017-10-02 [[[8]],[[9],[10,11]]]
2017-10-02 [[[9]],[[10],[11,12]]]

View File

@ -0,0 +1,37 @@
create database if not exists test;
drop table if exists test.test_ins_arr;
create table test.test_ins_arr (date Date, val Array(UInt64)) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr select toDate('2017-10-02'), [number, 42] from system.numbers limit 10000;
select * from test.test_ins_arr limit 10;
drop table test.test_ins_arr;
drop table if exists test.test_ins_null;
create table test.test_ins_null (date Date, val Nullable(UInt64)) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_null select toDate('2017-10-02'), if(number % 2, number, Null) from system.numbers limit 10000;
select * from test.test_ins_null limit 10;
drop table test.test_ins_null;
drop table if exists test.test_ins_arr_null;
create table test.test_ins_arr_null (date Date, val Array(Nullable(UInt64))) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr_null select toDate('2017-10-02'), [if(number % 2, number, Null), number, Null] from system.numbers limit 10000;
select * from test.test_ins_arr_null limit 10;
drop table test.test_ins_arr_null;
drop table if exists test.test_ins_arr_arr;
create table test.test_ins_arr_arr (date Date, val Array(Array(UInt64))) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr_arr select toDate('2017-10-02'), [[number],[number + 1, number + 2]] from system.numbers limit 10000;
select * from test.test_ins_arr_arr limit 10;
drop table test.test_ins_arr_arr;
drop table if exists test.test_ins_arr_arr_null;
create table test.test_ins_arr_arr_null (date Date, val Array(Array(Nullable(UInt64)))) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr_arr_null select toDate('2017-10-02'), [[1, Null, number], [3, Null, number]] from system.numbers limit 10000;
select * from test.test_ins_arr_arr_null limit 10;
drop table test.test_ins_arr_arr_null;
drop table if exists test.test_ins_arr_arr_arr;
create table test.test_ins_arr_arr_arr (date Date, val Array(Array(Array(UInt64)))) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr_arr_arr select toDate('2017-10-02'), [[[number]],[[number + 1], [number + 2, number + 3]]] from system.numbers limit 10000;
select * from test.test_ins_arr_arr_arr limit 10;
drop table test.test_ins_arr_arr_arr;

View File

@ -2,13 +2,15 @@
# Not default server config needed
tcp_ssl_port=`clickhouse extract-from-config -c /etc/clickhouse-server/config.xml -k tcp_ssl_port 2>/dev/null`
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
tcp_ssl_port=`${CLICKHOUSE_EXTRACT_CONFIG} -k tcp_ssl_port 2>/dev/null`
if [ -z ${tcp_ssl_port} ]; then
# Secure port disabled. Fake result
echo 1
echo 2
cat $CURDIR/00505_tcp_ssl.reference
else
# Auto port detect
clickhouse-client --ssl -q "SELECT 1";
clickhouse-client --ssl --port=9440 -q "SELECT 2";
${CLICKHOUSE_CLIENT} --ssl -q "SELECT 1";
${CLICKHOUSE_CLIENT} --ssl --port=9440 -q "SELECT 2";
fi

View File

@ -0,0 +1,10 @@
1
3
34
NOW okay =========================:
34
34
NOW BAD ==========================:
34
34
finish ===========================;

View File

@ -0,0 +1,17 @@
SELECT X FROM (SELECT * FROM (SELECT 1 AS X, 2 AS Y) UNION ALL SELECT 3, 4) ORDER BY X;
DROP TABLE IF EXISTS test.globalin;
CREATE TABLE test.globalin (CounterID UInt32, StartDate Date ) ENGINE = Memory;
INSERT INTO test.globalin VALUES (34, toDate('2017-10-02')), (42, toDate('2017-10-02')), (55, toDate('2017-10-01'));
SELECT * FROM ( SELECT CounterID FROM remote('localhost', 'test', 'globalin') WHERE (CounterID GLOBAL IN ( SELECT toUInt32(34))) GROUP BY CounterID);
SELECT 'NOW okay =========================:';
SELECT CounterID FROM remote('127.0.0.1', 'test', 'globalin') WHERE (CounterID GLOBAL IN ( SELECT toUInt32(34) )) GROUP BY CounterID UNION ALL SELECT CounterID FROM remote('localhost', 'test', 'globalin') WHERE (CounterID GLOBAL IN ( SELECT toUInt32(34))) GROUP BY CounterID;
SELECT 'NOW BAD ==========================:';
SELECT * FROM ( SELECT CounterID FROM remote('127.0.0.1', 'test', 'globalin') WHERE (CounterID GLOBAL IN ( SELECT toUInt32(34) )) GROUP BY CounterID UNION ALL SELECT CounterID FROM remote('localhost', 'test', 'globalin') WHERE (CounterID GLOBAL IN ( SELECT toUInt32(34))) GROUP BY CounterID);
SELECT 'finish ===========================;';
DROP TABLE test.globalin;

View File

@ -0,0 +1,16 @@
3 8
23 48
33 68
13 28
3 8
23 48
33 68
13 28
3 8
23 48
33 68
13 28
3 8
23 48
33 68
13 28

View File

@ -0,0 +1,24 @@
-- https://github.com/yandex/ClickHouse/issues/1059
DROP TABLE IF EXISTS test.union1;
DROP TABLE IF EXISTS test.union2;
DROP TABLE IF EXISTS test.union3;
CREATE TABLE test.union1 ( date Date, a Int32, b Int32, c Int32, d Int32) ENGINE = MergeTree(date, (a, date), 8192);
CREATE TABLE test.union2 ( date Date, a Int32, b Int32, c Int32, d Int32) ENGINE = Distributed(test_shard_localhost, 'test', 'union1');
CREATE TABLE test.union3 ( date Date, a Int32, b Int32, c Int32, d Int32) ENGINE = Distributed(test_shard_localhost, 'test', 'union2');
INSERT INTO test.union1 VALUES (1, 2, 3, 4, 5);
INSERT INTO test.union1 VALUES (11,12,13,14,15);
INSERT INTO test.union2 VALUES (21,22,23,24,25);
INSERT INTO test.union3 VALUES (31,32,33,34,35);
select b, sum(c) from ( select a, b, sum(c) as c from test.union2 where a>1 group by a,b UNION ALL select a, b, sum(c) as c from test.union2 where b>1 group by a, b ) as a group by b;
select b, sum(c) from ( select a, b, sum(c) as c from test.union1 where a>1 group by a,b UNION ALL select a, b, sum(c) as c from test.union2 where b>1 group by a, b ) as a group by b;
select b, sum(c) from ( select a, b, sum(c) as c from test.union1 where a>1 group by a,b UNION ALL select a, b, sum(c) as c from test.union1 where b>1 group by a, b ) as a group by b;
select b, sum(c) from ( select a, b, sum(c) as c from test.union2 where a>1 group by a,b UNION ALL select a, b, sum(c) as c from test.union3 where b>1 group by a, b ) as a group by b;
DROP TABLE test.union1;
DROP TABLE test.union2;
DROP TABLE test.union3;

View File

@ -0,0 +1,13 @@
export CLICKHOUSE_BINARY=${CLICKHOUSE_BINARY:="clickhouse"}
export CLICKHOUSE_CLIENT=${CLICKHOUSE_CLIENT:="${CLICKHOUSE_BINARY} client"}
export CLICKHOUSE_HOST=${CLICKHOUSE_HOST:="localhost"}
export CLICKHOUSE_PORT_TCP=${CLICKHOUSE_PORT_TCP:="9000"}
export CLICKHOUSE_PORT_HTTP=${CLICKHOUSE_PORT_HTTP:="8123"}
export CLICKHOUSE_PORT_HTTPS=${CLICKHOUSE_PORT_HTTPS:="8443"}
export CLICKHOUSE_PORT_HTTP_PROTO=${CLICKHOUSE_PORT_HTTP_PROTO:="http"}
export CLICKHOUSE_URL=${CLICKHOUSE_URL:="${CLICKHOUSE_PORT_HTTP_PROTO}://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTP}/"}
export CLICKHOUSE_CONFIG=${CLICKHOUSE_CONFIG:="/etc/clickhouse-server/config.xml"}
export CLICKHOUSE_EXTRACT_CONFIG=${CLICKHOUSE_EXTRACT_CONFIG:="$CLICKHOUSE_BINARY extract-from-config -c $CLICKHOUSE_CONFIG"}
export CLICKHOUSE_CURL=${CLICKHOUSE_CURL:="curl --max-time 5"}

View File

@ -4,81 +4,67 @@ set -e
CLICKHOUSE_USER=clickhouse
CLICKHOUSE_GROUP=${CLICKHOUSE_USER}
CLICKHOUSE_DATADIR=/var/lib/clickhouse
CLICKHOUSE_DATADIR_OLD=/opt/clickhouse # remove after 2017-06-01
CLICKHOUSE_LOGDIR=/var/log/clickhouse-server
CLICKHOUSE_SERVER_ETCDIR=/etc/clickhouse-server
if [ "$1" = configure ]; then
if [ -x "/etc/init.d/clickhouse-server" ]; then
update-rc.d clickhouse-server defaults 19 19 >/dev/null || exit $?
fi
if [ -x "/etc/init.d/clickhouse-server" ]; then
update-rc.d clickhouse-server defaults 19 19 >/dev/null || exit $?
fi
# Make sure the administrative user exists
if ! getent passwd ${CLICKHOUSE_USER} > /dev/null; then
adduser --system --disabled-login --no-create-home --home /nonexistent \
--shell /bin/false --group --gecos "Clickhouse server" clickhouse > /dev/null
fi
# Make sure the administrative user exists
if ! getent passwd ${CLICKHOUSE_USER} > /dev/null; then
adduser --system --disabled-login --no-create-home --home /nonexistent \
--shell /bin/false --group --gecos "Clickhouse server" clickhouse > /dev/null
fi
# if the user was created manually, make sure the group is there as well
if ! getent group ${CLICKHOUSE_GROUP} > /dev/null; then
addgroup --system ${CLICKHOUSE_GROUP} > /dev/null
fi
# if the user was created manually, make sure the group is there as well
if ! getent group ${CLICKHOUSE_GROUP} > /dev/null; then
addgroup --system ${CLICKHOUSE_GROUP} > /dev/null
fi
# make sure user is in the correct group
if ! id -Gn ${CLICKHOUSE_USER} | grep -qw ${CLICKHOUSE_USER}; then
adduser ${CLICKHOUSE_USER} ${CLICKHOUSE_GROUP} > /dev/null
fi
# make sure user is in the correct group
if ! id -Gn ${CLICKHOUSE_USER} | grep -qw ${CLICKHOUSE_USER}; then
adduser ${CLICKHOUSE_USER} ${CLICKHOUSE_GROUP} > /dev/null
fi
# check validity of user and group
if [ "`id -u ${CLICKHOUSE_USER}`" -eq 0 ]; then
echo "The ${CLICKHOUSE_USER} system user must not have uid 0 (root).
# check validity of user and group
if [ "`id -u ${CLICKHOUSE_USER}`" -eq 0 ]; then
echo "The ${CLICKHOUSE_USER} system user must not have uid 0 (root).
Please fix this and reinstall this package." >&2
exit 1
fi
exit 1
fi
if [ "`id -g ${CLICKHOUSE_GROUP}`" -eq 0 ]; then
echo "The ${CLICKHOUSE_USER} system user must not have root as primary group.
if [ "`id -g ${CLICKHOUSE_GROUP}`" -eq 0 ]; then
echo "The ${CLICKHOUSE_USER} system user must not have root as primary group.
Please fix this and reinstall this package." >&2
exit 1
fi
exit 1
fi
if [ ! -d ${CLICKHOUSE_DATADIR} ]; then
# only for compatibility for old /opt/clickhouse, remove after 2017-06-01
if [ -d ${CLICKHOUSE_DATADIR_OLD} ]; then
ln -s ${CLICKHOUSE_DATADIR_OLD} ${CLICKHOUSE_DATADIR}
else
# DONT remove after 2017-06-01 :
mkdir -p ${CLICKHOUSE_DATADIR}
chown ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_DATADIR}
chmod 700 ${CLICKHOUSE_DATADIR}
fi
fi
if [ ! -d ${CLICKHOUSE_DATADIR} ]; then
mkdir -p ${CLICKHOUSE_DATADIR}
chown ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_DATADIR}
chmod 700 ${CLICKHOUSE_DATADIR}
fi
if [ ! -d ${CLICKHOUSE_LOGDIR} ]; then
mkdir -p ${CLICKHOUSE_LOGDIR}
chown root:${CLICKHOUSE_GROUP} ${CLICKHOUSE_LOGDIR}
# Allow everyone to read logs, root and clickhouse to read-write
chmod 775 ${CLICKHOUSE_LOGDIR}
fi
if [ ! -d ${CLICKHOUSE_LOGDIR} ]; then
mkdir -p ${CLICKHOUSE_LOGDIR}
chown root:${CLICKHOUSE_GROUP} ${CLICKHOUSE_LOGDIR}
# Allow everyone to read logs, root and clickhouse to read-write
chmod 775 ${CLICKHOUSE_LOGDIR}
fi
if [ -d ${CLICKHOUSE_LOGDIR} ]; then
# only for compatibility for old metrika user, remove string after 2017-06-01 :
su -s /bin/sh ${CLICKHOUSE_USER} -c "test -w ${CLICKHOUSE_LOGDIR}" || chown -R root:${CLICKHOUSE_GROUP} ${CLICKHOUSE_LOGDIR}; chmod -R ug+rw ${CLICKHOUSE_LOGDIR}
fi
if [ -d ${CLICKHOUSE_LOGDIR} ]; then
# only for compatibility for old metrika user, remove string after 2017-06-01
su -s /bin/sh ${CLICKHOUSE_USER} -c "test -w ${CLICKHOUSE_LOGDIR}" || chown -R root:${CLICKHOUSE_GROUP} ${CLICKHOUSE_LOGDIR}; chmod -R ug+rw ${CLICKHOUSE_LOGDIR}
fi
if [ -d ${CLICKHOUSE_SERVER_ETCDIR} ]; then
# -R only for compatibility for old metrika user, remove -R after 2017-06-01
su -s /bin/sh ${CLICKHOUSE_USER} -c "test -w ${CLICKHOUSE_SERVER_ETCDIR}" || chown -R ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_SERVER_ETCDIR}
fi
# Clean old dynamic compilation results
if [ -d "${CLICKHOUSE_DATADIR}/build" ]; then
rm -f ${CLICKHOUSE_DATADIR}/build/*.cpp ${CLICKHOUSE_DATADIR}/build/*.so ||:
fi
# Clean old dynamic compilation results
if [ -d "${CLICKHOUSE_DATADIR}/build" ]; then
rm -f ${CLICKHOUSE_DATADIR}/build/*.cpp ${CLICKHOUSE_DATADIR}/build/*.so ||:
fi
fi

View File

@ -95,6 +95,10 @@ check_config()
initdb()
{
if [ -d ${SYSCONFDIR} ]; then
su -s /bin/sh ${CLICKHOUSE_USER} -c "test -w ${SYSCONFDIR}" || chown ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${SYSCONFDIR}
fi
if [ -x "$BINDIR/$GENERIC_PROGRAM" ]; then
CLICKHOUSE_DATADIR_FROM_CONFIG=$(su -s $SHELL ${CLICKHOUSE_USER} -c "$BINDIR/$GENERIC_PROGRAM extract-from-config --config-file=\"$CLICKHOUSE_CONFIG\" --key=path")
if [ "(" "$?" -ne "0" ")" -o "(" -z "${CLICKHOUSE_DATADIR_FROM_CONFIG}" ")" ]; then
@ -138,12 +142,6 @@ initdb()
echo "Changing owner of [${CLICKHOUSE_LOGDIR}] to [${CLICKHOUSE_LOGDIR_USER}:${CLICKHOUSE_GROUP}]"
chown ${CLICKHOUSE_LOGDIR_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_LOGDIR}
fi
# Temporary fix for old metrika user, remove after 2017-06-01
if [ ! -z ${CLICKHOUSE_DATADIR_OLD} ] && [ -d ${CLICKHOUSE_DATADIR_OLD} ] && ! $(su -s $SHELL ${CLICKHOUSE_USER} -c "test -w ${CLICKHOUSE_DATADIR_OLD}") ; then
echo "Changing owner of old [${CLICKHOUSE_DATADIR_OLD}] to [${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP}]"
chown -RL ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_DATADIR_OLD}
fi
}

View File

@ -125,7 +125,6 @@ html_theme_options = {
'link': '#08f',
'link_hover': 'red',
'extra_nav_links': collections.OrderedDict([
('ClickHouse Meetup Berlin', 'https://events.yandex.com/events/meetings/05-10-2017/'),
('Switch to Russian <img id="svg-flag" src="/docs/en/_static/ru.svg" width="20" height="12" />', '#ru'),
('Single page documentation', '/docs/en/single/'),
('Website home', '/'),

View File

@ -7,6 +7,7 @@ There exist third-party client libraries for ClickHouse:
- `infi.clickhouse_orm <https://github.com/Infinidat/infi.clickhouse_orm>`_
- `sqlalchemy-clickhouse <https://github.com/cloudflare/sqlalchemy-clickhouse>`_
- `clickhouse-driver <https://github.com/mymarilyn/clickhouse-driver>`_
- `clickhouse-client <https://github.com/yurial/clickhouse-client>`_
* PHP
- `clickhouse-php-client <https://github.com/8bitov/clickhouse-php-client>`_
- `PhpClickHouseClient <https://github.com/SevaCode/PhpClickHouseClient>`_

View File

@ -40,7 +40,9 @@ Users are recorded in the ``users`` section. Let's look at part of the ``users.x
</web>
Here we can see that two users are declared: ``default`` and ``web``. We added the ``web`` user ourselves.
The ``default`` user is chosen in cases when the username is not passed, so this user must be present in the config file. The ``default`` user is also used for distributed query processing - the system accesses remote servers under this username. So the ``default`` user must have an empty password and must not have substantial restrictions or quotas - otherwise, distributed queries will fail.
The ``default`` user is chosen in cases when the username is not passed. The ``default`` user can also be used for distributed query processing - the system accesses remote servers using this username if no ``user`` and ``password`` were configured for that server inside cluster configuration (see also section about "Distributed" table engine).
For connection to the server inside cluster you should use the user without any substantial restrictions or quotas - otherwise, distributed queries will fail.
The password is specified in plain text directly in the config. In this regard, you should not consider these passwords as providing security against potential malicious attacks. Rather, they are necessary for protection from Yandex employees.

View File

@ -7,6 +7,7 @@
- `infi.clickhouse_orm <https://github.com/Infinidat/infi.clickhouse_orm>`_
- `sqlalchemy-clickhouse <https://github.com/cloudflare/sqlalchemy-clickhouse>`_
- `clickhouse-driver <https://github.com/mymarilyn/clickhouse-driver>`_
- `clickhouse-client <https://github.com/yurial/clickhouse-client>`_
* PHP
- `clickhouse-php-client <https://github.com/8bitov/clickhouse-php-client>`_
- `PhpClickHouseClient <https://github.com/SevaCode/PhpClickHouseClient>`_

View File

@ -60,7 +60,9 @@
</web>
Здесь видно объявление двух пользователей - ``default`` и ``web``. Пользователя ``web`` мы добавили самостоятельно.
Пользователь ``default`` выбирается в случаях, когда имя пользователя не передаётся, поэтому такой пользователь должен присутствовать в конфигурационном файле обязательно. Также пользователь ``default`` используется при распределённой обработки запроса - система ходит на удалённые серверы под ним. Поэтому, у пользователя ``default`` должен быть пустой пароль и не должно быть выставлено существенных ограничений или квот - иначе распределённые запросы сломаются.
Пользователь ``default`` выбирается в случаях, когда имя пользователя не передаётся. Также пользователь ``default`` может использоваться при распределённой обработке запроса - если в конфигурации кластера для сервера не указаны ``user`` и ``password``. (см. раздел о движке "Distributed").
Пользователь, который используется для обмена информацией между серверами, объединенными в кластер, не должен иметь существенных ограничений или квот - иначе распределённые запросы сломаются.
Пароль указывается либо в открытом виде (не рекомендуется), либо в виде SHA-256. Хэш не содержит соль. В связи с этим, не следует рассматривать такие пароли, как защиту от потенциального злоумышленника. Скорее, они нужны для защиты от сотрудников.

View File

@ -598,7 +598,7 @@ ClickHouse отсекает все пробелы и один перенос с
Вставка результатов ``SELECT``
------------------------------
""""""""""""""""""""""""""""""
.. code-block:: sql
@ -613,7 +613,7 @@ ClickHouse отсекает все пробелы и один перенос с
Замечания о производительности
------------------------------
""""""""""""""""""""""""""""""
``INSERT`` сортирует входящие данные по первичному ключу и разбивает их на партиции по месяцам. Если вы вставляете данные за разные месяцы вперемешку, то это может значительно снизить производительность запроса ``INSERT``. Чтобы избежать этого:

View File

@ -1,3 +1,5 @@
include (${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake)
foreach (T longjmp siglongjmp)
add_executable (${T} ${T}.c)
target_link_libraries (${T} glibc-compatibility)

View File

@ -18,3 +18,5 @@ top -bn1
tail -n200 /var/log/clickhouse-server/clickhouse-server.err.log
tail -n200 /var/log/clickhouse-server/clickhouse-server.log
tail -n100 /var/log/clickhouse-server/stderr
cat /etc/lsb-release
uname -a

View File

@ -219,7 +219,7 @@ a:hover, a:active {
#announcement-link {
color: #000;
text-decoration: underline;
font: 400 200%/133% 'Yandex Sans Display Web',Arial,sans-serif;
font: 400 150%/133% 'Yandex Sans Display Web',Arial,sans-serif;
margin: 1em 0;
}
#announcement-link:hover {

View File

@ -92,7 +92,7 @@
</div>
<div id="announcement" class="colored-block">
<div class="page">
<a id="announcement-link" href="https://events.yandex.com/events/meetings/05-10-2017/" target="_blank">ClickHouse Meetup in Berlin on October 5, 2017</a>
<a id="announcement-link" href="http://bit.ly/clickhouse-meetup-palo-alto-october-2017" rel="external nofollow" target="_blank">ClickHouse Community Meetup in Palo Alto on October 25, 2017</a>
</div>
</div>