This commit is contained in:
Alexey Milovidov 2016-03-03 04:55:17 +03:00
commit f84772a55e
78 changed files with 3211 additions and 440 deletions

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "private"]
path = private
url = git@███████████.yandex-team.ru:Metrika/ClickHouse_private.git

View File

@ -49,6 +49,9 @@ ENDIF(TESTS)
# Префикс для установки
SET(CMAKE_INSTALL_PREFIX /usr)
# Директория для файлов, специфичных для Яндекса
SET(CLICKHOUSE_PRIVATE_DIR ${METRICA_SOURCE_DIR}/private/)
include_directories (${METRICA_SOURCE_DIR}/contrib/libcityhash/)
include_directories (${METRICA_SOURCE_DIR}/contrib/liblz4/include/)
include_directories (${METRICA_SOURCE_DIR}/contrib/libdivide/)

View File

@ -232,6 +232,7 @@ add_library (dbms
include/DB/DataStreams/AddingDefaultBlockOutputStream.h
include/DB/DataStreams/JSONCompactRowOutputStream.h
include/DB/DataStreams/PartialSortingBlockInputStream.h
include/DB/DataStreams/PreSendCallbackInputStream.h
include/DB/DataStreams/MarkInCompressedFile.h
include/DB/DataStreams/CSVRowOutputStream.h
include/DB/DataStreams/CSVRowInputStream.h
@ -331,6 +332,7 @@ add_library (dbms
include/DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h
include/DB/Interpreters/ClusterProxy/AlterQueryConstructor.h
include/DB/Interpreters/ClusterProxy/Query.h
include/DB/Interpreters/ClusterProxy/PreSendHook.h
include/DB/Common/Allocator.h
include/DB/Common/CombinedCardinalityEstimator.h
include/DB/Common/ExternalTable.h
@ -441,6 +443,7 @@ add_library (dbms
include/DB/IO/WriteBufferFromFileBase.h
include/DB/IO/ReadBufferFromIStream.h
include/DB/IO/UncompressedCache.h
include/DB/IO/InterserverWriteBuffer.h
include/DB/Columns/ColumnTuple.h
include/DB/Columns/ColumnVector.h
include/DB/Columns/IColumnDummy.h
@ -535,7 +538,7 @@ add_library (dbms
include/DB/Storages/MergeTree/ReshardingJob.h
include/DB/Storages/MergeTree/ReshardingWorker.h
include/DB/Storages/MergeTree/MergeTreeSharder.h
include/DB/Storages/MergeTree/ShardedPartitionSender.h
include/DB/Storages/MergeTree/ShardedPartitionUploader.h
include/DB/Storages/MergeTree/RemoteQueryExecutor.h
include/DB/Storages/StorageNull.h
include/DB/Storages/System/StorageSystemSettings.h
@ -576,6 +579,7 @@ add_library (dbms
src/IO/createWriteBufferFromFileBase.cpp
src/IO/ReadBufferFromFileBase.cpp
src/IO/WriteBufferFromFileBase.cpp
src/IO/InterserverWriteBuffer.cpp
src/Columns/ColumnConst.cpp
src/Columns/ColumnArray.cpp
@ -638,7 +642,7 @@ add_library (dbms
src/Storages/MergeTree/ReshardingJob.cpp
src/Storages/MergeTree/ReshardingWorker.cpp
src/Storages/MergeTree/MergeTreeSharder.cpp
src/Storages/MergeTree/ShardedPartitionSender.cpp
src/Storages/MergeTree/ShardedPartitionUploader.cpp
src/Storages/MergeTree/RemoteQueryExecutor.cpp
src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
src/Storages/ITableDeclaration.cpp

View File

@ -41,15 +41,16 @@ public:
* Если флаг get_all установлен, все соединения достаются.
* Выкидывает исключение, если не удалось выделить ни одного соединения.
*/
std::vector<Entry> getMany(const Settings * settings = nullptr, bool get_all = false)
std::vector<Entry> getMany(const Settings * settings = nullptr,
PoolMode pool_mode = PoolMode::GET_MANY)
{
return doGetMany(settings, get_all);
return doGetMany(settings, pool_mode);
}
protected:
virtual Entry doGet(const Settings * settings) = 0;
virtual std::vector<Entry> doGetMany(const Settings * settings, bool get_all)
virtual std::vector<Entry> doGetMany(const Settings * settings, PoolMode pool_mode)
{
return std::vector<Entry>{ get(settings) };
}

View File

@ -89,10 +89,10 @@ private:
/** Выделяет до указанного количества соединений для работы.
* Соединения предоставляют доступ к разным репликам одного шарда.
*/
std::vector<Entry> doGetMany(const Settings * settings, bool get_all) override
std::vector<Entry> doGetMany(const Settings * settings, PoolMode pool_mode) override
{
applyLoadBalancing(settings);
return Base::getMany(settings, get_all);
return Base::getMany(settings, pool_mode);
}
void applyLoadBalancing(const Settings * settings)

View File

@ -28,7 +28,7 @@ public:
* Если флаг get_all_replicas установлен, достаются все соединения.
*/
MultiplexedConnections(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info = false, bool do_broadcast = false);
bool append_extra_info = false, PoolMode pool_mode_ = PoolMode::GET_MANY);
/** Принимает пулы, один для каждого шарда, из которих нужно будет достать одно или несколько
* соединений.
@ -37,7 +37,7 @@ public:
* Если флаг do_broadcast установлен, достаются все соединения.
*/
MultiplexedConnections(ConnectionPools & pools_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info = false, bool do_broadcast = false);
bool append_extra_info = false, PoolMode pool_mode_ = PoolMode::GET_MANY);
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
@ -155,7 +155,7 @@ private:
/// Отменили запрос
bool cancelled = false;
bool do_broadcast = false;
PoolMode pool_mode = PoolMode::GET_MANY;
/// Мьютекс для того, чтобы функция sendCancel могла выполняться безопасно
/// в отдельном потоке.

View File

@ -8,6 +8,12 @@
#include <common/logger_useful.h>
enum class PoolMode
{
GET_ONE = 0,
GET_MANY,
GET_ALL
};
/** Класс, от которого можно унаследоваться и получить пул чего-нибудь. Используется для пулов соединений с БД.
* Наследник должен предоставить метод для создания нового объекта для помещения в пул.

View File

@ -14,6 +14,7 @@ namespace ErrorCodes
{
extern const int ALL_CONNECTION_TRIES_FAILED;
extern const int CANNOT_CLOCK_GETTIME;
extern const int LOGICAL_ERROR;
}
}
@ -112,15 +113,19 @@ public:
/** Выделяет до указанного количества соединений для работы
* Соединения предоставляют доступ к разным репликам одного шарда.
*/
std::vector<Entry> getMany(const DB::Settings * settings, bool get_all)
std::vector<Entry> getMany(const DB::Settings * settings, PoolMode pool_mode)
{
ResourceTracker resource_tracker{nested_pools.size()};
UInt64 max_connections;
if (get_all)
if (pool_mode == PoolMode::GET_ALL)
max_connections = nested_pools.size();
else
else if (pool_mode == PoolMode::GET_ONE)
max_connections = 1;
else if (pool_mode == PoolMode::GET_MANY)
max_connections = settings ? UInt64(settings->max_parallel_replicas) : 1;
else
throw DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR);
bool skip_unavailable = settings ? UInt64(settings->skip_unavailable_shards) : false;
@ -134,7 +139,7 @@ public:
if (getResource(entry, fail_messages, &resource_tracker, settings))
connections.push_back(entry);
else if (get_all || ((i == 0) && !skip_unavailable))
else if ((pool_mode == PoolMode::GET_ALL) || ((i == 0) && !skip_unavailable))
throw DB::NetException("All connection tries failed. Log: \n\n" + fail_messages.str() + "\n", DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
else
break;

View File

@ -0,0 +1,48 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Interpreters/ClusterProxy/PreSendHook.h>
#include <Poco/SharedPtr.h>
#include <functional>
namespace DB
{
class PreSendCallbackInputStream : public IProfilingBlockInputStream
{
public:
PreSendCallbackInputStream(Poco::SharedPtr<IInterpreter> & interpreter_, ClusterProxy::PreSendHook::Callback callback)
: interpreter(interpreter_), pre_send_callback(std::bind(callback, nullptr))
{
}
String getName() const override { return "PreSendCallbackInput"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
protected:
Block readImpl() override
{
if (!is_sent)
{
is_sent = true;
pre_send_callback();
stream = interpreter->execute().in;
}
return stream->read();
}
private:
bool is_sent = false;
Poco::SharedPtr<IInterpreter> interpreter;
BlockInputStreamPtr stream;
std::function<void()> pre_send_callback;
};
}

View File

@ -5,6 +5,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/Throttler.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ClusterProxy/PreSendHook.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/MultiplexedConnections.h>
@ -42,12 +43,14 @@ public:
~RemoteBlockInputStream() override;
/// Отправить запрос на все существующие реплики.
void doBroadcast();
void setPoolMode(PoolMode pool_mode_);
/// Кроме блоков, получить информацию о блоках.
void appendExtraInfo();
///
void attachPreSendCallback(ClusterProxy::PreSendHook::Callback callback);
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
void readPrefix() override;
@ -72,6 +75,11 @@ public:
return multiplexed_connections->getBlockExtraInfo();
}
size_t getConnectionCount() const
{
return multiplexed_connections->size();
}
protected:
/// Отправить на удаленные серверы все временные таблицы.
void sendExternalTables();
@ -131,6 +139,8 @@ private:
std::vector<ExternalTablesData> external_tables_data;
std::mutex external_tables_mutex;
std::function<void()> pre_send_callback;
/// Установили соединения с репликами, но ещё не отправили запрос.
std::atomic<bool> established { false };
@ -161,7 +171,7 @@ private:
std::atomic<bool> got_unknown_packet_from_replica { false };
bool append_extra_info = false;
bool do_broadcast = false;
PoolMode pool_mode = PoolMode::GET_MANY;
Logger * log = &Logger::get("RemoteBlockInputStream");
};

View File

@ -0,0 +1,55 @@
#pragma once
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/HashingWriteBuffer.h>
#include <Poco/SharedPtr.h>
#include <Poco/Net/HTTPClientSession.h>
namespace DB
{
namespace
{
constexpr auto DEFAULT_REMOTE_WRITE_BUFFER_CONNECTION_TIMEOUT = 1;
constexpr auto DEFAULT_REMOTE_WRITE_BUFFER_RECEIVE_TIMEOUT = 1800;
constexpr auto DEFAULT_REMOTE_WRITE_BUFFER_SEND_TIMEOUT = 1800;
}
/** Позволяет писать файл на удалённый сервер.
*/
class InterserverWriteBuffer final : public WriteBuffer
{
public:
InterserverWriteBuffer(const std::string & host_, int port_,
const std::string & endpoint_,
const std::string & path_,
bool compress_ = false,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const Poco::Timespan & connection_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_CONNECTION_TIMEOUT, 0),
const Poco::Timespan & send_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_SEND_TIMEOUT, 0),
const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_RECEIVE_TIMEOUT, 0));
~InterserverWriteBuffer();
void finalize();
void cancel();
private:
void nextImpl() override;
private:
std::string host;
int port;
std::string path;
Poco::Net::HTTPClientSession session;
std::ostream * ostr; /// этим владеет session
Poco::SharedPtr<WriteBuffer> impl;
/// Отправили все данные и переименовали файл
bool finalized = false;
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <functional>
namespace DB
@ -23,5 +24,7 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes);
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<bool> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<bool> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook);
}

View File

@ -80,6 +80,7 @@ public:
using ShardsInfo = std::vector<ShardInfo>;
public:
String getName() const { return name; }
const ShardsInfo & getShardsInfo() const { return shards_info; }
const Addresses & getShardsAddresses() const { return addresses; }
const AddressesWithFailover & getShardsWithFailoverAddresses() const { return addresses_with_failover; }
@ -99,7 +100,14 @@ public:
private:
void initMisc();
/// Create a unique name based on the list of addresses and ports.
/// We need it in order to be able to perform resharding requests
/// on tables that have the distributed engine.
void assignName();
private:
/// Название кластера.
String name;
/// Описание шардов кластера.
ShardsInfo shards_info;
/// Любой удалённый шард.
@ -113,12 +121,19 @@ private:
size_t local_shard_count = 0;
};
struct Clusters
class Clusters
{
typedef std::map<String, Cluster> Impl;
Impl impl;
public:
Clusters(const Settings & settings, const String & config_name = "remote_servers");
Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete;
public:
using Impl = std::map<String, Cluster>;
public:
Impl impl;
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Interpreters/Cluster.h>
#include <DB/Interpreters/ClusterProxy/PreSendHook.h>
#include <DB/Parsers/IAST.h>
#include <DB/Storages/IStorage.h>
#include <DB/Client/ConnectionPool.h>
@ -11,8 +12,6 @@ namespace DB
class Settings;
class Context;
class Cluster;
class IInterpreter;
class RemoteBlockInputStream;
class Throttler;
namespace ClusterProxy
@ -22,12 +21,19 @@ class IQueryConstructor
{
public:
virtual ~IQueryConstructor() {}
void setPreSendHook(PreSendHook & pre_send_hook_) { pre_send_hook = pre_send_hook_; }
void setupBarrier(size_t count) { if (pre_send_hook) { pre_send_hook.setupBarrier(count); } }
virtual BlockInputStreamPtr createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address) = 0;
virtual BlockInputStreamPtr createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) = 0;
virtual BlockInputStreamPtr createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & new_settings, ThrottlerPtr throttler, const Context & context) = 0;
virtual bool isInclusive() const = 0;
protected:
PreSendHook pre_send_hook;
};
}

View File

@ -0,0 +1,85 @@
#pragma once
#include <DB/Common/Exception.h>
#include <functional>
#include <memory>
#include <boost/thread/barrier.hpp>
namespace DB
{
class RemoteBlockInputStream;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace ClusterProxy
{
struct PreSendHook
{
public:
using PreProcess = std::function<void(const RemoteBlockInputStream *)>;
using PostProcess = std::function<void()>;
using Callback = PreProcess;
public:
PreSendHook() = default;
PreSendHook(PreProcess pre_process_)
: pre_process(pre_process_), is_initialized(pre_process)
{
}
PreSendHook(PreProcess pre_process_, PostProcess post_process_)
: pre_process(pre_process_), post_process(post_process_),
is_initialized(pre_process_)
{
}
operator bool() const
{
return is_initialized;
}
void setupBarrier(size_t count)
{
if (!is_initialized)
throw Exception("PreSendHook: uninitialized object", ErrorCodes::LOGICAL_ERROR);
if (barrier)
throw Exception("PreSendHook: barrier already set up", ErrorCodes::LOGICAL_ERROR);
barrier = std::make_shared<boost::barrier>(count);
}
Callback makeCallback()
{
if (!is_initialized)
throw Exception("PreSendHook: uninitialized object", ErrorCodes::LOGICAL_ERROR);
auto callback = [=](const RemoteBlockInputStream * remote_stream)
{
pre_process(remote_stream);
if (barrier)
{
barrier->count_down_and_wait();
if (post_process)
post_process();
}
};
return callback;
}
private:
PreProcess pre_process;
PostProcess post_process;
std::shared_ptr<boost::barrier> barrier;
bool is_initialized = false;
};
}
}

View File

@ -56,6 +56,7 @@ private:
Field last_partition;
WeightedZooKeeperPaths weighted_zookeeper_paths;
ASTPtr sharding_key_expr;
Field coordinator;
static PartitionCommand dropPartition(const Field & partition, bool detach, bool unreplicated)
{
@ -78,9 +79,10 @@ private:
}
static PartitionCommand reshardPartitions(const Field & first_partition_, const Field & last_partition_,
const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const ASTPtr & sharding_key_expr)
const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const ASTPtr & sharding_key_expr_, const Field & coordinator_)
{
return {RESHARD_PARTITION, first_partition_, false, false, false, {}, last_partition_, weighted_zookeeper_paths_, sharding_key_expr};
return {RESHARD_PARTITION, first_partition_, false, false, false, {},
last_partition_, weighted_zookeeper_paths_, sharding_key_expr_, coordinator_};
}
};

View File

@ -64,7 +64,7 @@ class InterserverIOEndpoint
{
public:
virtual std::string getId(const std::string & path) const = 0;
virtual void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) = 0;
virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) = 0;
virtual ~InterserverIOEndpoint() {}
void cancel() { is_cancelled = true; }

View File

@ -197,6 +197,8 @@ struct Settings
M(SettingBool, enable_http_compression, 0) \
/** Уровень сжатия - используется, если клиент по HTTP сказал, что он понимает данные, сжатые методом gzip или deflate */ \
M(SettingInt64, http_zlib_compression_level, 3) \
/** Таймаут в секундах */ \
M(SettingUInt64, resharding_barrier_timeout, 120) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -17,8 +17,10 @@ namespace ErrorCodes
* DROP COLUMN col_drop,
* MODIFY COLUMN col_name type,
* DROP PARTITION partition
* RESHARD PARTITION partition TO /path/to/zookeeper/table WEIGHT w, ... USING column
* ...
* RESHARD PARTITION partition
* TO '/path/to/zookeeper/table' [WEIGHT w], ...
* USING expression
* [COORDINATE WITH 'coordinator_id']
*/
class ASTAlterQuery : public IAST
@ -71,6 +73,7 @@ public:
ASTPtr last_partition;
ASTPtr weighted_zookeeper_paths;
ASTPtr sharding_key_expr;
ASTPtr coordinator;
/// deep copy
void clone(Parameters & p) const;

View File

@ -13,7 +13,10 @@ namespace DB
* [DROP|DETACH|ATTACH [UNREPLICATED] PARTITION|PART partition, ...]
* [FETCH PARTITION partition FROM ...]
* [FREEZE PARTITION]
* [RESHARD PARTITION partition TO zookeeper/path/to/partition [WEIGHT w] [, ...] USING sharding_key]
* [RESHARD PARTITION partition
* TO '/path/to/zookeeper/table' [WEIGHT w], ...
* USING expression
* [COORDINATE WITH 'coordinator_id']]
*/
class ParserAlterQuery : public IParserBase
{

View File

@ -241,8 +241,10 @@ public:
/** Выполнить запрос RESHARD PARTITION.
*/
virtual void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr,
virtual void reshardPartitions(ASTPtr query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, const Field & coordinator,
const Settings & settings)
{
throw Exception("Method reshardPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);

View File

@ -27,7 +27,7 @@ public:
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override;
private:
MergeTreeData::DataPartPtr findPart(const String & name);

View File

@ -3,6 +3,7 @@
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <atomic>
#include <functional>
namespace DB
{
@ -15,12 +16,16 @@ class ReshardingJob;
*/
class MergeTreeDataMerger
{
public:
using CancellationHook = std::function<void()>;
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)>;
public:
static const size_t NO_LIMIT = std::numeric_limits<size_t>::max();
MergeTreeDataMerger(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (Merger)")) {}
typedef std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> AllowedMergingPredicate;
void setCancellationHook(CancellationHook cancellation_hook_);
/** Выбирает, какие куски слить. Использует кучу эвристик.
* Если merge_anything_for_old_months, для кусков за прошедшие месяцы снимается ограничение на соотношение размеров.
@ -53,7 +58,6 @@ public:
*/
MergeTreeData::PerShardDataParts reshardPartition(
const ReshardingJob & job,
size_t aio_threshold,
DiskSpaceMonitor::Reservation * disk_reservation = nullptr);
/// Примерное количество места на диске, нужное для мерджа. С запасом.
@ -66,7 +70,7 @@ public:
void uncancel() { cancelled = false; }
bool isCancelled() const { return cancelled; }
void abortIfRequested() const;
void abortIfRequested();
private:
/** Выбрать все куски принадлежащие одной партиции.
@ -81,6 +85,8 @@ private:
/// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто).
time_t disk_space_warning_time = 0;
CancellationHook cancellation_hook;
std::atomic<bool> cancelled {false};
};

View File

@ -21,7 +21,7 @@ public:
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override;
private:
const Context & context;

View File

@ -20,7 +20,7 @@ public:
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override;
private:
Context & context;

View File

@ -6,30 +6,47 @@
namespace DB
{
class StorageReplicatedMergeTree;
/** Описание задачи перешардирования.
*/
struct ReshardingJob final
{
public:
ReshardingJob() = default;
/// Создаёт описание на основе его сериализованного представления.
ReshardingJob(const std::string & serialized_job);
ReshardingJob(const std::string & database_name_, const std::string & table_name_,
const std::string & partition_, const WeightedZooKeeperPaths & paths_,
const ASTPtr & sharding_key_expr_);
const ASTPtr & sharding_key_expr_, const std::string & coordinator_id_);
ReshardingJob(const ReshardingJob &) = delete;
ReshardingJob & operator=(const ReshardingJob &) = delete;
ReshardingJob(ReshardingJob &&) = default;
ReshardingJob & operator=(ReshardingJob &&) = default;
operator bool() const;
/// Сериализует описание задачи.
std::string toString() const;
bool isCoordinated() const;
void clear();
public:
std::string database_name;
std::string table_name;
std::string partition;
WeightedZooKeeperPaths paths;
ASTPtr sharding_key_expr;
std::string coordinator_id;
StorageReplicatedMergeTree * storage = nullptr;
UInt64 block_number = 0;
bool is_aborted = false;
};
}

View File

@ -1,20 +1,27 @@
#pragma once
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/MergeTree/ReshardingJob.h>
#include <DB/Storages/AlterCommands.h>
#include <common/logger_useful.h>
#include <zkutil/RWLock.h>
#include <zkutil/SingleBarrier.h>
#include <zkutil/Barrier.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/SharedPtr.h>
#include <string>
#include <thread>
#include <atomic>
#include <functional>
namespace DB
{
class Context;
class Cluster;
class StorageReplicatedMergeTree;
class ReshardingJob;
/** Исполнитель задач перешардирования.
* Рабоает в фоновом режиме внутри одного потока.
@ -23,6 +30,16 @@ class ReshardingJob;
*/
class ReshardingWorker final
{
public:
using PartitionList = std::vector<std::string>;
enum Status
{
STATUS_OK = 0,
STATUS_ERROR, /// Произошла ошибка на одном исполнителе.
STATUS_ON_HOLD /// Задача приостановлена.
};
public:
ReshardingWorker(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, Context & context_);
@ -35,20 +52,52 @@ public:
/// Запустить поток выполняющий задачи перешардирования.
void start();
void shutdown();
/// Прислать запрос на перешардирование.
void submitJob(const std::string & database_name,
const std::string & table_name,
const std::string & partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr);
void submitJob(const ReshardingJob & job);
/// Был ли поток запущен?
bool isStarted() const;
/// Создать новый координатор распределённой задачи. Вызывается с инициатора.
std::string createCoordinator(const Cluster & cluster);
///
void registerQuery(const std::string & coordinator_id, const std::string & query);
/// Удалить координатор.
void deleteCoordinator(const std::string & coordinator_id);
/// Подписаться к заданному координатору. Вызывается с исполнителя.
UInt64 subscribe(const std::string & coordinator_id, const std::string & query);
/// Отменить подпись к заданному координатору. Вызывается с исполнителя.
void unsubscribe(const std::string & coordinator_id);
/// Увеличить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
void addPartitions(const std::string & coordinator_id, const PartitionList & partition_list);
///
ReshardingWorker::PartitionList::iterator categorizePartitions(const std::string & coordinator_id, ReshardingWorker::PartitionList & partition_list);
/// Получить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
size_t getPartitionCount(const std::string & coordinator_id);
/// Получить количество учавствующих узлов.
size_t getNodeCount(const std::string & coordinator_id);
/// Ждать завершение проверок на всех исполнителях. Вызывается с исполнителя.
void waitForCheckCompletion(const std::string & coordinator_id);
/// Ждать завершение всех необходмых отмен подписей.
void waitForOptOutCompletion(const std::string & coordinator_id, size_t count);
/// Установить статус заданной распределённой задачи.
void setStatus(const std::string & coordinator_id, Status status);
///
void setStatus(const std::string & coordinator_id, const std::string & hostname, Status status);
/// Получить статус заданной распределённой задачи.
Status getStatus();
private:
/// Следить за появлением новых задач. Выполнить их последовательно.
void pollAndExecute();
/// Подтолкнуть планировщик задач.
void jabScheduler();
/// Выполнить задачи, которые были в очереди выполнения при запуске узла.
void performPendingJobs();
@ -56,35 +105,72 @@ private:
void perform(const Strings & job_nodes);
/// Выполнить одну задачу.
void perform(const ReshardingJob & job);
void perform(const std::string & job_descriptor);
/// Разбить куски входящие в партицию на несколько, согласно ключу шардирования.
/// Оновременно перегруппировать эти куски по шардам и слить куски в каждой группе.
/// При завершении этого процесса создаётся новая партиция для каждого шарда.
void createShardedPartitions(StorageReplicatedMergeTree & storage, const ReshardingJob & job);
void createShardedPartitions();
/// Копировать все партиции полученные путём перешардирования на каждую реплику
/// соответствующих шардов.
void publishShardedPartitions(StorageReplicatedMergeTree & storage, const ReshardingJob & job);
void publishShardedPartitions();
/// Для каждого шарда добавить данные из новой партиции этого шарда в таблицу на всех
/// репликах входящих в этот же шард. На локальном узле, который выполняет задачу
/// перешардирования, удалить данные из первоначальной партиции.
void applyChanges(StorageReplicatedMergeTree & storage, const ReshardingJob & job);
void applyChanges();
/// Удалить временные данные с локального узла и ZooKeeper'а.
void cleanup(StorageReplicatedMergeTree & storage, const ReshardingJob & job);
void softCleanup();
void hardCleanup();
void cleanupCommon();
/// Принудительно завершить поток.
void abortIfRequested() const;
void abortPollingIfRequested();
void abortCoordinatorIfRequested(const std::string & coordinator_id);
void abortRecoveryIfRequested();
void abortJobIfRequested();
Status getCoordinatorStatus(const std::string & coordinator_id);
/// Зарегистрировать задачу в соответствующий координатор.
void attachJob();
/// Снять задачу с координатора.
void detachJob();
/// Ждать завершение загрузок на всех исполнителях.
void waitForUploadCompletion();
size_t getPartitionCountUnlocked(const std::string & coordinator_id);
bool updateOfflineNodes();
/// Функции, которые создают необходимые объекты для синхронизации
/// распределённых задач.
zkutil::RWLock createLock();
zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id);
zkutil::Barrier createCheckBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count);
zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job);
zkutil::SingleBarrier createUploadBarrier(const ReshardingJob & job);
std::string computeHash(const std::string & in);
std::string getCoordinatorPath(const std::string & coordinator_id) const;
std::string getPartitionPath(const ReshardingJob & job) const;
private:
ReshardingJob current_job;
std::thread polling_thread;
std::string host_task_queue_path;
std::string distributed_path;
std::string distributed_online_path;
std::string distributed_lock_path;
std::string coordination_path;
Context & context;
Logger * log;
std::unique_ptr<MergeTreeDataMerger> merger;
std::thread polling_thread;
mutable std::mutex cancel_mutex;
std::string host_task_queue_path;
std::atomic<bool> is_started{false};
std::atomic<bool> must_stop{false};
};

View File

@ -1,15 +1,17 @@
#pragma once
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/IO/WriteBuffer.h>
#include <common/logger_useful.h>
#include <functional>
namespace DB
{
class StorageReplicatedMergeTree;
namespace ShardedPartitionSender
namespace ShardedPartitionUploader
{
/** Сервис для получения кусков из партиции таблицы *MergeTree.
@ -21,10 +23,11 @@ public:
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override;
private:
StorageReplicatedMergeTree & storage;
MergeTreeData & data;
Logger * log;
};
@ -33,14 +36,29 @@ private:
class Client final
{
public:
Client();
using CancellationHook = std::function<void()>;
public:
Client(StorageReplicatedMergeTree & storage_);
Client(const Client &) = delete;
Client & operator=(const Client &) = delete;
bool send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location,
const std::string & part, size_t shard_no);
void setCancellationHook(CancellationHook cancellation_hook_);
bool send(const std::string & part_name, size_t shard_no,
const InterserverIOEndpointLocation & to_location);
void cancel() { is_cancelled = true; }
private:
MergeTreeData::DataPartPtr findShardedPart(const std::string & name, size_t shard_no);
void abortIfRequested();
private:
StorageReplicatedMergeTree & storage;
MergeTreeData & data;
CancellationHook cancellation_hook;
std::atomic<bool> is_cancelled{false};
Logger * log;
};

View File

@ -76,8 +76,10 @@ public:
void shutdown() override;
void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr,
void reshardPartitions(ASTPtr query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, const Field & coordinator,
const Settings & settings) override;
/// От каждой реплики получить описание соответствующей локальной таблицы.

View File

@ -13,7 +13,7 @@
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Storages/MergeTree/DataPartsExchange.h>
#include <DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/ShardedPartitionSender.h>
#include <DB/Storages/MergeTree/ShardedPartitionUploader.h>
#include <DB/Storages/MergeTree/RemoteQueryExecutor.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <zkutil/ZooKeeper.h>
@ -135,8 +135,10 @@ public:
void attachPartition(ASTPtr query, const Field & partition, bool unreplicated, bool part, const Settings & settings) override;
void fetchPartition(const Field & partition, const String & from, const Settings & settings) override;
void freezePartition(const Field & partition, const Settings & settings) override;
void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr,
void reshardPartitions(ASTPtr query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, const Field & coordinator,
const Settings & settings) override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
@ -190,8 +192,8 @@ private:
friend class ScopedPartitionMergeLock;
friend class ReshardingWorker;
friend class ShardedPartitionSender::Client;
friend class ShardedPartitionSender::Service;
friend class ShardedPartitionUploader::Client;
friend class ShardedPartitionUploader::Service;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
@ -248,7 +250,7 @@ private:
InterserverIOEndpointHolderPtr endpoint_holder;
InterserverIOEndpointHolderPtr disk_space_monitor_endpoint_holder;
InterserverIOEndpointHolderPtr sharded_partition_sender_endpoint_holder;
InterserverIOEndpointHolderPtr sharded_partition_uploader_endpoint_holder;
InterserverIOEndpointHolderPtr remote_query_executor_endpoint_holder;
MergeTreeData data;
@ -258,7 +260,7 @@ private:
DataPartsExchange::Fetcher fetcher;
RemoteDiskSpaceMonitor::Client disk_space_monitor_client;
ShardedPartitionSender::Client sharded_partition_sender_client;
ShardedPartitionUploader::Client sharded_partition_uploader_client;
RemoteQueryExecutor::Client remote_query_executor_client;
zkutil::LeaderElectionPtr leader_election;

View File

@ -39,8 +39,8 @@ MultiplexedConnections::MultiplexedConnections(Connection * connection_, const S
}
MultiplexedConnections::MultiplexedConnections(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, bool do_broadcast_)
: settings(settings_), throttler(throttler_), do_broadcast(do_broadcast_)
bool append_extra_info, PoolMode pool_mode_)
: settings(settings_), throttler(throttler_), pool_mode(pool_mode_)
{
if (pool_ == nullptr)
throw Exception("Invalid pool specified", ErrorCodes::LOGICAL_ERROR);
@ -55,8 +55,8 @@ MultiplexedConnections::MultiplexedConnections(IConnectionPool * pool_, const Se
}
MultiplexedConnections::MultiplexedConnections(ConnectionPools & pools_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, bool do_broadcast_)
: settings(settings_), throttler(throttler_), do_broadcast(do_broadcast_)
bool append_extra_info, PoolMode pool_mode_)
: settings(settings_), throttler(throttler_), pool_mode(pool_mode_)
{
if (pools_.empty())
throw Exception("Pools are not specified", ErrorCodes::LOGICAL_ERROR);
@ -274,7 +274,7 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
void MultiplexedConnections::initFromShard(IConnectionPool * pool)
{
auto entries = pool->getMany(settings, do_broadcast);
auto entries = pool->getMany(settings, pool_mode);
/// Если getMany() не выделил соединений и не кинул исключения, это значит, что была
/// установлена настройка skip_unavailable_shards. Тогда просто возвращаемся.

View File

@ -1,4 +1,3 @@
<!-- Конфиг, который ищется в текущей директории. -->
<config>
<compression>false</compression>
</config>

View File

@ -1,4 +1,3 @@
<!-- Конфиг, устанавливающийся в /etc/clickhouse-client/. Он используется, если других конфигов не найдено. -->
<config>
<compression>false</compression>
</config>

View File

@ -325,6 +325,19 @@ namespace ErrorCodes
extern const int UNKNOWN_STATUS_OF_INSERT = 319;
extern const int DUPLICATE_SHARD_PATHS = 320;
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE = 321;
extern const int RESHARDING_BUSY_CLUSTER = 322;
extern const int RESHARDING_BUSY_SHARD = 323;
extern const int RESHARDING_NO_SUCH_COORDINATOR = 324;
extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP = 325;
extern const int RESHARDING_ALREADY_SUBSCRIBED = 326;
extern const int RESHARDING_REMOTE_NODE_UNAVAILABLE = 327;
extern const int RESHARDING_REMOTE_NODE_ERROR = 328;
extern const int RESHARDING_COORDINATOR_DELETED = 329;
extern const int RESHARDING_DISTRIBUTED_JOB_ON_HOLD = 330;
extern const int RESHARDING_INVALID_QUERY = 331;
extern const int RESHARDING_INITIATOR_CHECK_FAILED = 332;
extern const int RWLOCK_ALREADY_HELD = 333;
extern const int BARRIER_TIMEOUT = 334;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -1,6 +1,7 @@
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Common/VirtualColumnUtils.h>
#include <DB/Common/NetException.h>
namespace DB
{
@ -58,9 +59,9 @@ RemoteBlockInputStream::~RemoteBlockInputStream()
multiplexed_connections->disconnect();
}
void RemoteBlockInputStream::doBroadcast()
void RemoteBlockInputStream::setPoolMode(PoolMode pool_mode_)
{
do_broadcast = true;
pool_mode = pool_mode_;
}
void RemoteBlockInputStream::appendExtraInfo()
@ -68,6 +69,11 @@ void RemoteBlockInputStream::appendExtraInfo()
append_extra_info = true;
}
void RemoteBlockInputStream::attachPreSendCallback(ClusterProxy::PreSendHook::Callback callback)
{
pre_send_callback = std::bind(callback, this);
}
void RemoteBlockInputStream::readPrefix()
{
if (!sent_query)
@ -238,10 +244,10 @@ void RemoteBlockInputStream::createMultiplexedConnections()
multiplexed_connections = std::make_unique<MultiplexedConnections>(connection, multiplexed_connections_settings, throttler);
else if (pool != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(pool, multiplexed_connections_settings, throttler,
append_extra_info, do_broadcast);
append_extra_info, pool_mode);
else if (!pools.isNull())
multiplexed_connections = std::make_unique<MultiplexedConnections>(*pools, multiplexed_connections_settings, throttler,
append_extra_info, do_broadcast);
append_extra_info, pool_mode);
else
throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
}
@ -259,7 +265,19 @@ void RemoteBlockInputStream::init(const Settings * settings_)
void RemoteBlockInputStream::sendQuery()
{
createMultiplexedConnections();
try
{
createMultiplexedConnections();
}
catch (...)
{
if (pre_send_callback)
pre_send_callback();
throw;
}
if (pre_send_callback)
pre_send_callback();
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;

View File

@ -8,6 +8,7 @@
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
#include <DB/DataStreams/CollapsingFinalBlockInputStream.h>
#include <DB/DataStreams/copyData.h>

View File

@ -6,47 +6,48 @@
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataStreams/TabSeparatedRowInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataStreams/BlockInputStreamFromRowInputStream.h>
#include <DB/DataStreams/BlockOutputStreamFromRowOutputStream.h>
using namespace DB;
int main(int argc, char ** argv)
try
{
try
Block sample;
{
DB::Block sample;
{
DB::ColumnWithTypeAndName col;
col.type = new DB::DataTypeUInt64;
sample.insert(col);
}
{
DB::ColumnWithTypeAndName col;
col.type = new DB::DataTypeString;
sample.insert(col);
}
std::ifstream istr("test_in");
std::ofstream ostr("test_out");
DB::ReadBufferFromIStream in_buf(istr);
DB::WriteBufferFromOStream out_buf(ostr);
DB::TabSeparatedRowInputStream row_input(in_buf, sample);
DB::TabSeparatedRowOutputStream row_output(out_buf, sample);
DB::copyData(row_input, row_output);
ColumnWithTypeAndName col;
col.type = new DataTypeUInt64;
sample.insert(col);
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
ColumnWithTypeAndName col;
col.type = new DataTypeString;
sample.insert(col);
}
ReadBufferFromFile in_buf("test_in");
WriteBufferFromFile out_buf("test_out");
RowInputStreamPtr row_input = new TabSeparatedRowInputStream(in_buf, sample);
RowOutputStreamPtr row_output = new TabSeparatedRowOutputStream(out_buf, sample);
BlockInputStreamFromRowInputStream block_input(row_input, sample);
BlockOutputStreamFromRowOutputStream block_output(row_output);
copyData(block_input, block_output);
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
return 1;
}

View File

@ -11,6 +11,7 @@
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/UnionBlockInputStream.h>
#include <DB/DataStreams/BlockExtraInfoInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>

View File

@ -11,6 +11,7 @@
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/UnionBlockInputStream.h>
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>

View File

@ -0,0 +1,104 @@
#include <DB/IO/InterserverWriteBuffer.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <Poco/URI.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
}
InterserverWriteBuffer::InterserverWriteBuffer(const std::string & host_, int port_,
const std::string & endpoint_,
const std::string & path_,
bool compress_,
size_t buffer_size_,
const Poco::Timespan & connection_timeout,
const Poco::Timespan & send_timeout,
const Poco::Timespan & receive_timeout)
: WriteBuffer(nullptr, 0), host(host_), port(port_), path(path_)
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
std::string encoded_endpoint;
Poco::URI::encode(endpoint_, "&#", encoded_endpoint);
std::string compress_str = compress_ ? "true" : "false";
std::string encoded_compress;
Poco::URI::encode(compress_str, "&#", encoded_compress);
std::stringstream uri;
uri << "http://" << host << ":" << port
<< "/?endpoint=" << encoded_endpoint
<< "&compress=" << encoded_compress
<< "&path=" << encoded_path;
std::string uri_str = Poco::URI(uri.str()).getPathAndQuery();
session.setHost(host);
session.setPort(port);
session.setKeepAlive(true);
/// устанавливаем таймаут
session.setTimeout(connection_timeout, send_timeout, receive_timeout);
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri_str, Poco::Net::HTTPRequest::HTTP_1_1);
request.setChunkedTransferEncoding(true);
ostr = &session.sendRequest(request);
impl = new WriteBufferFromOStream(*ostr, buffer_size_);
set(impl->buffer().begin(), impl->buffer().size());
}
InterserverWriteBuffer::~InterserverWriteBuffer()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void InterserverWriteBuffer::nextImpl()
{
if (!offset() || finalized)
return;
/// Для корректной работы с AsynchronousWriteBuffer, который подменяет буферы.
impl->set(buffer().begin(), buffer().size());
impl->position() = pos;
impl->next();
}
void InterserverWriteBuffer::finalize()
{
if (finalized)
return;
next();
finalized = true;
}
void InterserverWriteBuffer::cancel()
{
finalized = true;
}
}

View File

@ -7,8 +7,10 @@
namespace DB
{
namespace
{
static void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::atomic<bool> * is_cancelled)
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::atomic<bool> * is_cancelled)
{
/// Если дочитали до конца буфера, eof() либо заполнит буфер новыми данными и переместит курсор в начало, либо вернёт false.
while (bytes > 0 && !from.eof())
@ -27,6 +29,26 @@ static void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes,
throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function<void()> cancellation_hook)
{
/// Если дочитали до конца буфера, eof() либо заполнит буфер новыми данными и переместит курсор в начало, либо вернёт false.
while (bytes > 0 && !from.eof())
{
if (cancellation_hook)
cancellation_hook();
/// buffer() - кусок данных, доступных для чтения; position() - курсор места, до которого уже дочитали.
size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
to.write(from.position(), count);
from.position() += count;
bytes -= count;
}
if (check_bytes && bytes > 0)
throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
}
void copyData(ReadBuffer & from, WriteBuffer & to)
{
@ -38,6 +60,11 @@ void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<bool> & is_cancel
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled);
}
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), cancellation_hook);
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
{
copyDataImpl(from, to, true, bytes, nullptr);
@ -48,4 +75,9 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<boo
copyDataImpl(from, to, true, bytes, &is_cancelled);
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook)
{
copyDataImpl(from, to, true, bytes, cancellation_hook);
}
}

View File

@ -2,8 +2,10 @@
#include <DB/Common/escapeForFileName.h>
#include <DB/Common/isLocalAddress.h>
#include <DB/Common/SimpleCache.h>
#include <DB/IO/HexWriteBuffer.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <openssl/sha.h>
namespace DB
{
@ -339,6 +341,48 @@ void Cluster::initMisc()
break;
}
}
assignName();
}
void Cluster::assignName()
{
std::vector<std::string> elements;
if (!addresses.empty())
{
for (const auto & address : addresses)
elements.push_back(address.host_name + ":" + toString(address.port));
}
else if (!addresses_with_failover.empty())
{
for (const auto & addresses : addresses_with_failover)
{
for (const auto & address : addresses)
elements.push_back(address.host_name + ":" + toString(address.port));
}
}
else
throw Exception("Cluster: ill-formed cluster", ErrorCodes::LOGICAL_ERROR);
std::sort(elements.begin(), elements.end());
unsigned char hash[SHA512_DIGEST_LENGTH];
SHA512_CTX ctx;
SHA512_Init(&ctx);
for (const auto & host : elements)
SHA512_Update(&ctx, reinterpret_cast<const void *>(host.data()), host.size());
SHA512_Final(hash, &ctx);
{
WriteBufferFromString buf(name);
HexWriteBuffer hex_buf(buf);
hex_buf.write(reinterpret_cast<const char *>(hash), sizeof(hash));
}
}
}

View File

@ -1,7 +1,6 @@
#include <DB/Interpreters/ClusterProxy/AlterQueryConstructor.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/BlockExtraInfoInputStream.h>
#include <DB/DataStreams/PreSendCallbackInputStream.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
namespace DB
@ -12,20 +11,47 @@ namespace ClusterProxy
BlockInputStreamPtr AlterQueryConstructor::createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address)
{
InterpreterAlterQuery interpreter(query_ast, context);
return interpreter.execute().in;
if (pre_send_hook)
{
Poco::SharedPtr<IInterpreter> interpreter = new InterpreterAlterQuery(query_ast, context);
auto callback = pre_send_hook.makeCallback();
return new PreSendCallbackInputStream(interpreter, callback);
}
else
{
InterpreterAlterQuery interpreter(query_ast, context);
return interpreter.execute().in;
}
}
BlockInputStreamPtr AlterQueryConstructor::createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
return new RemoteBlockInputStream{pool, query, &settings, throttler};
auto stream = new RemoteBlockInputStream{pool, query, &settings, throttler};
stream->setPoolMode(PoolMode::GET_ONE);
if (pre_send_hook)
{
auto callback = pre_send_hook.makeCallback();
stream->attachPreSendCallback(callback);
}
return stream;
}
BlockInputStreamPtr AlterQueryConstructor::createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
return new RemoteBlockInputStream{pools, query, &settings, throttler};
auto stream = new RemoteBlockInputStream{pools, query, &settings, throttler};
stream->setPoolMode(PoolMode::GET_ONE);
if (pre_send_hook)
{
auto callback = pre_send_hook.makeCallback();
stream->attachPreSendCallback(callback);
}
return stream;
}
bool AlterQueryConstructor::isInclusive() const

View File

@ -2,6 +2,7 @@
#include <DB/Interpreters/InterpreterDescribeQuery.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/BlockExtraInfoInputStream.h>
#include <DB/DataStreams/PreSendCallbackInputStream.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
namespace DB
@ -26,22 +27,42 @@ namespace ClusterProxy
BlockInputStreamPtr DescribeQueryConstructor::createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address)
{
InterpreterDescribeQuery interpreter(query_ast, context);
BlockInputStreamPtr stream;
if (pre_send_hook)
{
Poco::SharedPtr<IInterpreter> interpreter = new InterpreterDescribeQuery(query_ast, context);
auto callback = pre_send_hook.makeCallback();
stream = new PreSendCallbackInputStream(interpreter, callback);
}
else
{
InterpreterDescribeQuery interpreter(query_ast, context);
stream = interpreter.execute().in;
}
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in);
return new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address));
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
BlockInputStreamPtr materialized_stream = new MaterializingBlockInputStream(stream);
return new BlockExtraInfoInputStream(materialized_stream, toBlockExtraInfo(address));
}
BlockInputStreamPtr DescribeQueryConstructor::createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = new RemoteBlockInputStream{pool, query, &settings, throttler};
stream->doBroadcast();
stream->setPoolMode(PoolMode::GET_ALL);
stream->appendExtraInfo();
if (pre_send_hook)
{
auto callback = pre_send_hook.makeCallback();
stream->attachPreSendCallback(callback);
}
return stream;
}
@ -49,8 +70,15 @@ BlockInputStreamPtr DescribeQueryConstructor::createRemote(ConnectionPoolsPtr &
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = new RemoteBlockInputStream{pools, query, &settings, throttler};
stream->doBroadcast();
stream->setPoolMode(PoolMode::GET_ALL);
stream->appendExtraInfo();
if (pre_send_hook)
{
auto callback = pre_send_hook.makeCallback();
stream->attachPreSendCallback(callback);
}
return stream;
}

View File

@ -68,11 +68,57 @@ BlockInputStreams Query::execute()
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
ConnectionPoolsPtr pools;
bool do_init = true;
/// Цикл по шардам.
/// Compute the number of parallel streams.
size_t stream_count = 0;
size_t pool_count = 0;
size_t current_thread = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
bool create_local_queries = shard_info.isLocal();
bool create_remote_queries = query_constructor.isInclusive() ? shard_info.hasRemoteConnections() : !create_local_queries;
if (create_local_queries)
stream_count += shard_info.local_addresses.size();
if (create_remote_queries)
{
size_t excess = (current_thread < remainder) ? 1 : 0;
size_t actual_pools_per_thread = pools_per_thread + excess;
if (actual_pools_per_thread == 1)
{
++stream_count;
++current_thread;
}
else
{
if (do_init)
{
pool_count = 0;
do_init = false;
}
++pool_count;
if (pool_count == actual_pools_per_thread)
{
++stream_count;
do_init = true;
++current_thread;
}
}
}
}
query_constructor.setupBarrier(stream_count);
/// Цикл по шардам.
ConnectionPoolsPtr pools;
do_init = true;
current_thread = 0;
for (const auto & shard_info : cluster.getShardsInfo())
{
bool create_local_queries = shard_info.isLocal();

View File

@ -1,6 +1,7 @@
#include <DB/Interpreters/ClusterProxy/SelectQueryConstructor.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/DataStreams/PreSendCallbackInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
namespace DB
@ -17,25 +18,55 @@ SelectQueryConstructor::SelectQueryConstructor(const QueryProcessingStage::Enum
BlockInputStreamPtr SelectQueryConstructor::createLocal(ASTPtr query_ast, const Context & context, const Cluster::Address & address)
{
InterpreterSelectQuery interpreter(query_ast, context, processed_stage);
BlockInputStreamPtr stream;
if (pre_send_hook)
{
Poco::SharedPtr<IInterpreter> interpreter = new InterpreterSelectQuery(query_ast, context, processed_stage);
auto callback = pre_send_hook.makeCallback();
stream = new PreSendCallbackInputStream(interpreter, callback);
}
else
{
InterpreterSelectQuery interpreter(query_ast, context, processed_stage);
stream = interpreter.execute().in;
}
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
return new MaterializingBlockInputStream(interpreter.execute().in);
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
return new MaterializingBlockInputStream(stream);
}
BlockInputStreamPtr SelectQueryConstructor::createRemote(IConnectionPool * pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
return new RemoteBlockInputStream{pool, query, &settings, throttler, external_tables, processed_stage, context};
auto stream = new RemoteBlockInputStream{pool, query, &settings, throttler, external_tables, processed_stage, context};
stream->setPoolMode(PoolMode::GET_MANY);
if (pre_send_hook)
{
auto callback = pre_send_hook.makeCallback();
stream->attachPreSendCallback(callback);
}
return stream;
}
BlockInputStreamPtr SelectQueryConstructor::createRemote(ConnectionPoolsPtr & pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
return new RemoteBlockInputStream{pools, query, &settings, throttler, external_tables, processed_stage, context};
auto stream = new RemoteBlockInputStream{pools, query, &settings, throttler, external_tables, processed_stage, context};
stream->setPoolMode(PoolMode::GET_MANY);
if (pre_send_hook)
{
auto callback = pre_send_hook.makeCallback();
stream->attachPreSendCallback(callback);
}
return stream;
}
bool SelectQueryConstructor::isInclusive() const

View File

@ -67,7 +67,9 @@ BlockIO InterpreterAlterQuery::execute()
break;
case PartitionCommand::RESHARD_PARTITION:
table->reshardPartitions(database_name, command.partition, command.last_partition, command.weighted_zookeeper_paths, command.sharding_key_expr, context.getSettingsRef());
table->reshardPartitions(query_ptr, database_name, command.partition, command.last_partition,
command.weighted_zookeeper_paths, command.sharding_key_expr,
command.coordinator, context.getSettingsRef());
break;
default:
@ -190,8 +192,12 @@ void InterpreterAlterQuery::parseAlter(
weighted_zookeeper_paths.emplace_back(weighted_zookeeper_path.path, weighted_zookeeper_path.weight);
}
Field coordinator;
if (params.coordinator)
coordinator = dynamic_cast<const ASTLiteral &>(*params.coordinator).value;
out_partition_commands.push_back(PartitionCommand::reshardPartitions(
first_partition, last_partition, weighted_zookeeper_paths, params.sharding_key_expr));
first_partition, last_partition, weighted_zookeeper_paths, params.sharding_key_expr, coordinator));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);

View File

@ -20,6 +20,7 @@ void ASTAlterQuery::Parameters::clone(Parameters & p) const
if (last_partition) p.last_partition = last_partition->clone();
if (weighted_zookeeper_paths) p.weighted_zookeeper_paths = weighted_zookeeper_paths->clone();
if (sharding_key_expr) p.sharding_key_expr = sharding_key_expr->clone();
if (coordinator) p.coordinator = coordinator->clone();
}
void ASTAlterQuery::addParameters(const Parameters & params)
@ -37,6 +38,8 @@ void ASTAlterQuery::addParameters(const Parameters & params)
children.push_back(params.weighted_zookeeper_paths);
if (params.sharding_key_expr)
children.push_back(params.sharding_key_expr);
if (params.coordinator)
children.push_back(params.coordinator);
}
ASTAlterQuery::ASTAlterQuery(StringRange range_) : IAST(range_)
@ -159,6 +162,15 @@ void ASTAlterQuery::formatImpl(const FormatSettings & settings, FormatState & st
<< "USING " << (settings.hilite ? hilite_none : "");
p.sharding_key_expr->formatImpl(settings, state, frame);
if (p.coordinator)
{
settings.ostr << settings.nl_or_ws;
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str
<< "COORDINATE WITH " << (settings.hilite ? hilite_none : "");
p.coordinator->formatImpl(settings, state, frame);
}
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);

View File

@ -36,6 +36,8 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
ParserString s_to("TO", true, true);
ParserString s_using("USING", true, true);
ParserString s_key("KEY", true, true);
ParserString s_coordinate("COORDINATE", true, true);
ParserString s_with("WITH", true, true);
ParserString s_comma(",");
ParserString s_doubledot("..");
@ -256,6 +258,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
{
ParserList weighted_zookeeper_paths_p(ParserPtr(new ParserWeightedZooKeeperPath), ParserPtr(new ParserString(",")), false);
ParserExpressionWithOptionalAlias parser_sharding_key_expr(false);
ParserStringLiteral parser_coordinator;
ws.ignore(pos, end);
@ -299,6 +302,21 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
ws.ignore(pos, end);
if (s_coordinate.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
if (!s_with.ignore(pos, end, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (!parser_coordinator.parse(pos, end, params.coordinator, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
}
params.type = ASTAlterQuery::RESHARD_PARTITION;
}
else

View File

@ -25,7 +25,7 @@ INSTALL(
COMPONENT clickhouse-server)
INSTALL(
FILES ../../../../ClickHouse_private/Server/metrika/config.xml ../../../../ClickHouse_private/Server/metrika/users.xml
FILES ${CLICKHOUSE_PRIVATE_DIR}/Server/metrika/config.xml ${CLICKHOUSE_PRIVATE_DIR}/Server/metrika/users.xml
DESTINATION /etc/clickhouse-server/metrika
COMPONENT clickhouse-server
OPTIONAL)

View File

@ -2,6 +2,7 @@
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/ReadBufferFromIStream.h>
namespace DB
@ -20,20 +21,22 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque
{
HTMLForm params(request);
std::ostringstream request_ostream;
request_ostream << request.stream().rdbuf();
std::string request_string = request_ostream.str();
//std::ostringstream request_ostream;
//request_ostream << request.stream().rdbuf();
//std::string request_string = request_ostream.str();
LOG_TRACE(log, "Request URI: " << request.getURI());
LOG_TRACE(log, "Request body: " << request_string);
//LOG_TRACE(log, "Request body: " << request_string);
std::istringstream request_istream(request_string);
//std::istringstream request_istream(request_string);
/// NOTE: Тут можно сделать аутентификацию, если понадобится.
String endpoint_name = params.get("endpoint");
bool compress = params.get("compress") == "true";
ReadBufferFromIStream body(request.stream());
WriteBufferFromHTTPServerResponse out(response);
auto endpoint = server.global_context->getInterserverIOHandler().getEndpoint(endpoint_name);
@ -41,11 +44,11 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque
if (compress)
{
CompressedWriteBuffer compressed_out(out);
endpoint->processQuery(params, compressed_out);
endpoint->processQuery(params, body, compressed_out);
}
else
{
endpoint->processQuery(params, out);
endpoint->processQuery(params, body, out);
}
out.finalize();

View File

@ -320,11 +320,13 @@ int Server::main(const std::vector<std::string> & args)
global_context->setCurrentDatabase(config().getString("default_database", "default"));
bool has_resharding_worker = false;
if (has_zookeeper && config().has("resharding"))
{
auto resharding_worker = std::make_shared<ReshardingWorker>(config(), "resharding", *global_context);
global_context->setReshardingWorker(resharding_worker);
resharding_worker->start();
has_resharding_worker = true;
}
SCOPE_EXIT(
@ -422,7 +424,18 @@ int Server::main(const std::vector<std::string> & args)
LOG_INFO(log, "Ready for connections.");
SCOPE_EXIT(
LOG_DEBUG(log, "Received termination signal. Waiting for current connections to close.");
LOG_DEBUG(log, "Received termination signal.");
if (has_resharding_worker)
{
LOG_INFO(log, "Shutting down resharding thread");
auto & resharding_worker = global_context->getReshardingWorker();
if (resharding_worker.isStarted())
resharding_worker.shutdown();
LOG_DEBUG(log, "Shut down resharding thread");
}
LOG_DEBUG(log, "Waiting for current connections to close.");
users_config_reloader.reset();

View File

@ -30,7 +30,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out)
{
if (is_cancelled)
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);

View File

@ -50,6 +50,10 @@ static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 1.6;
/// потому что между выбором кусков и резервированием места места может стать немного меньше.
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4;
void MergeTreeDataMerger::setCancellationHook(CancellationHook cancellation_hook_)
{
cancellation_hook = cancellation_hook_;
}
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once (или несколько больше, см merge_more_parts_if_sum_bytes_is_less_than)
/// кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
@ -515,9 +519,10 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
}
MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
const ReshardingJob & job,
size_t aio_threshold, DiskSpaceMonitor::Reservation * disk_reservation)
const ReshardingJob & job, DiskSpaceMonitor::Reservation * disk_reservation)
{
size_t aio_threshold = data.context.getSettings().min_bytes_to_use_direct_io;
/// Собрать все куски партиции.
DayNum_t month = MergeTreeData::getMonthFromName(job.partition);
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(month);
@ -592,7 +597,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
/// Шардирование слитых блоков.
/// Для нумерации блоков.
SimpleIncrement increment(data.getMaxDataPartIndex());
SimpleIncrement increment(job.block_number);
/// Создать новый кусок для каждого шарда.
MergeTreeData::PerShardDataParts per_shard_data_parts;
@ -774,11 +779,13 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}
void MergeTreeDataMerger::abortIfRequested() const
void MergeTreeDataMerger::abortIfRequested()
{
if (cancelled)
throw Exception("Cancelled partition resharding", ErrorCodes::ABORTED);
if (cancellation_hook)
cancellation_hook();
}
}

View File

@ -36,7 +36,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out)
{
if (is_cancelled)
throw Exception("RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED);

View File

@ -35,7 +35,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out)
{
if (is_cancelled)
throw Exception("RemoteQueryExecutor service terminated", ErrorCodes::ABORTED);
@ -50,6 +50,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
flag = false;
}

View File

@ -165,8 +165,8 @@ void ReplicatedMergeTreeRestartingThread::run()
storage.disk_space_monitor_endpoint_holder->cancel();
storage.disk_space_monitor_endpoint_holder = nullptr;
storage.sharded_partition_sender_endpoint_holder->cancel();
storage.sharded_partition_sender_endpoint_holder = nullptr;
storage.sharded_partition_uploader_endpoint_holder->cancel();
storage.sharded_partition_uploader_endpoint_holder = nullptr;
storage.remote_query_executor_endpoint_holder->cancel();
storage.remote_query_executor_endpoint_holder = nullptr;

View File

@ -34,6 +34,9 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job)
if (!parser.parse(pos, end, sharding_key_expr, max_parsed_pos, expected))
throw Exception("ReshardingJob: Internal error", ErrorCodes::LOGICAL_ERROR);
readBinary(coordinator_id, buf);
readBinary(block_number, buf);
while (!buf.eof())
{
std::string path;
@ -48,15 +51,25 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job)
ReshardingJob::ReshardingJob(const std::string & database_name_, const std::string & table_name_,
const std::string & partition_, const WeightedZooKeeperPaths & paths_,
const ASTPtr & sharding_key_expr_)
const ASTPtr & sharding_key_expr_, const std::string & coordinator_id_)
: database_name(database_name_),
table_name(table_name_),
partition(partition_),
paths(paths_),
sharding_key_expr(sharding_key_expr_)
sharding_key_expr(sharding_key_expr_),
coordinator_id(coordinator_id_)
{
}
ReshardingJob::operator bool() const
{
return !database_name.empty()
&& !table_name.empty()
&& !partition.empty()
&& !paths.empty()
&& (storage != nullptr);
}
std::string ReshardingJob::toString() const
{
std::string serialized_job;
@ -66,6 +79,8 @@ std::string ReshardingJob::toString() const
writeBinary(table_name, buf);
writeBinary(partition, buf);
writeBinary(queryToString(sharding_key_expr), buf);
writeBinary(coordinator_id, buf);
writeBinary(block_number, buf);
for (const auto & path : paths)
{
@ -77,4 +92,21 @@ std::string ReshardingJob::toString() const
return serialized_job;
}
bool ReshardingJob::isCoordinated() const
{
return !coordinator_id.empty();
}
void ReshardingJob::clear()
{
database_name.clear();
table_name.clear();
partition.clear();
paths.clear();
coordinator_id.clear();
storage = nullptr;
block_number = 0;
is_aborted = false;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,99 +0,0 @@
#include <DB/Storages/MergeTree/ShardedPartitionSender.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
}
namespace ShardedPartitionSender
{
namespace
{
std::string getEndpointId(const std::string & node_id)
{
return "ShardedPartitionSender:" + node_id;
}
}
Service::Service(StorageReplicatedMergeTree & storage_)
: storage(storage_), log(&Logger::get("ShardedPartitionSender::Service"))
{
}
std::string Service::getId(const std::string & node_id) const
{
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
if (is_cancelled)
throw Exception("ShardedPartitionSender service terminated", ErrorCodes::ABORTED);
InterserverIOEndpointLocation from_location(params.get("from_location"));
std::string part_name = params.get("part");
size_t shard_no = std::stoul(params.get("shard"));
if (is_cancelled)
throw Exception("ShardedPartitionSender service terminated", ErrorCodes::ABORTED);
MergeTreeData::MutableDataPartPtr part = storage.fetcher.fetchShardedPart(from_location, part_name, shard_no);
part->is_temp = false;
const std::string old_part_path = storage.full_path + part->name;
const std::string new_part_path = storage.full_path + "detached/" + part_name;
Poco::File new_part_dir(new_part_path);
if (new_part_dir.exists())
{
LOG_WARNING(log, "Directory " + new_part_path + " already exists. Removing.");
new_part_dir.remove(true);
}
Poco::File(old_part_path).renameTo(new_part_path);
bool flag = true;
writeBinary(flag, out);
out.next();
}
Client::Client()
: log(&Logger::get("ShardedPartitionSender::Client"))
{
}
bool Client::send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location,
const std::string & part, size_t shard_no)
{
ReadBufferFromHTTP::Params params =
{
{"endpoint", getEndpointId(to_location.name)},
{"from_location", from_location.toString()},
{"compress", "false"},
{"part", part},
{"shard", toString(shard_no)}
};
ReadBufferFromHTTP in(to_location.host, to_location.port, params);
bool flag;
readBinary(flag, in);
assertEOF(in);
return flag;
}
}
}

View File

@ -0,0 +1,211 @@
#include <DB/Storages/MergeTree/ShardedPartitionUploader.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/InterserverWriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
}
namespace ShardedPartitionUploader
{
namespace
{
std::string getEndpointId(const std::string & node_id)
{
return "ShardedPartitionUploader:" + node_id;
}
}
Service::Service(StorageReplicatedMergeTree & storage_)
: storage(storage_), data(storage_.getData()),
log(&Logger::get("ShardedPartitionUploader::Service"))
{
}
std::string Service::getId(const std::string & node_id) const
{
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out)
{
std::string part_name = params.get("path");
std::string replica_path = params.get("endpoint");
String full_part_name = std::string("detached/") + "tmp_" + part_name;
String part_path = data.getFullPath() + full_part_name + "/";
Poco::File part_file(part_path);
if (part_file.exists())
{
LOG_ERROR(log, "Directory " + part_path + " already exists. Removing.");
part_file.remove(true);
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
part_file.createDirectory();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->name = full_part_name;
new_data_part->is_temp = true;
size_t files;
readBinary(files, body);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, body);
readBinary(file_size, body);
WriteBufferFromFile file_out(part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(body, hashing_out, file_size, is_cancelled);
if (is_cancelled)
{
part_file.remove(true);
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
}
uint128 expected_hash;
readBinary(expected_hash, body);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}
assertEOF(body);
ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->modification_time = time(0);
new_data_part->loadColumns(true);
new_data_part->loadChecksums(true);
new_data_part->loadIndex();
new_data_part->is_sharded = false;
new_data_part->checksums.checkEqual(checksums, false);
/// Now store permanently the received part.
new_data_part->is_temp = false;
const std::string old_part_path = data.getFullPath() + full_part_name;
const std::string new_part_path = data.getFullPath() + "detached/" + part_name;
Poco::File new_part_dir(new_part_path);
if (new_part_dir.exists())
{
LOG_WARNING(log, "Directory " + new_part_path + " already exists. Removing.");
new_part_dir.remove(true);
}
Poco::File(old_part_path).renameTo(new_part_path);
}
Client::Client(StorageReplicatedMergeTree & storage_)
: storage(storage_), data(storage_.getData()),
log(&Logger::get("ShardedPartitionUploader::Client"))
{
}
MergeTreeData::DataPartPtr Client::findShardedPart(const String & name, size_t shard_no)
{
MergeTreeData::DataPartPtr part = data.getShardedPartIfExists(name, shard_no);
if (part)
return part;
throw Exception("No part " + name + " in table");
}
void Client::setCancellationHook(CancellationHook cancellation_hook_)
{
cancellation_hook = cancellation_hook_;
}
bool Client::send(const std::string & part_name, size_t shard_no,
const InterserverIOEndpointLocation & to_location)
{
std::function<void()> copy_hook = std::bind(&ShardedPartitionUploader::Client::abortIfRequested, this);
abortIfRequested();
InterserverWriteBuffer out(to_location.host, to_location.port, getEndpointId(to_location.name), part_name);
LOG_TRACE(log, "Sending part " << part_name);
auto storage_lock = storage.lockStructure(false);
MergeTreeData::DataPartPtr part = findShardedPart(part_name, shard_no);
Poco::ScopedReadRWLock part_lock(part->columns_lock);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
MergeTreeData::DataPart::Checksums data_checksums;
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
String path = data.getFullPath() + "reshard/" + toString(shard_no) + "/" + part_name + "/" + file_name;
UInt64 size = Poco::File(path).getSize();
writeStringBinary(it.first, out);
writeBinary(size, out);
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out, copy_hook);
abortIfRequested();
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writeBinary(hashing_out.getHash(), out);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
}
part->checksums.checkEqual(data_checksums, false);
return true;
}
void Client::abortIfRequested()
{
if (is_cancelled)
throw Exception("ShardedPartitionUploader service terminated", ErrorCodes::ABORTED);
if (cancellation_hook)
cancellation_hook();
}
}
}

View File

@ -6,6 +6,7 @@
#include <DB/Storages/VirtualColumnFactory.h>
#include <DB/Storages/Distributed/DistributedBlockOutputStream.h>
#include <DB/Storages/Distributed/DirectoryMonitor.h>
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTSelectQuery.h>
@ -35,6 +36,9 @@ namespace DB
namespace ErrorCodes
{
extern const int STORAGE_REQUIRES_PARAMETER;
extern const int RESHARDING_NO_WORKER;
extern const int RESHARDING_INVALID_PARAMETERS;
extern const int RESHARDING_INITIATOR_CHECK_FAILED;
}
@ -225,61 +229,118 @@ void StorageDistributed::shutdown()
directory_monitors.clear();
}
void StorageDistributed::reshardPartitions(const String & database_name, const Field & first_partition,
const Field & last_partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, const Settings & settings)
void StorageDistributed::reshardPartitions(ASTPtr query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, const Field & coordinator,
const Settings & settings)
{
/// Создать запрос ALTER TABLE xxx.yyy RESHARD PARTITION zzz TO ttt USING uuu.
auto & resharding_worker = context.getReshardingWorker();
if (!resharding_worker.isStarted())
throw Exception("Resharding background thread is not running", ErrorCodes::RESHARDING_NO_WORKER);
ASTPtr alter_query_ptr = new ASTAlterQuery;
auto & alter_query = static_cast<ASTAlterQuery &>(*alter_query_ptr);
if (!coordinator.isNull())
throw Exception("Use of COORDINATE WITH is forbidden in ALTER TABLE ... RESHARD"
" queries for distributed tables",
ErrorCodes::RESHARDING_INVALID_PARAMETERS);
alter_query.database = remote_database;
alter_query.table = remote_table;
std::string coordinator_id = resharding_worker.createCoordinator(cluster);
alter_query.parameters.emplace_back();
ASTAlterQuery::Parameters & parameters = alter_query.parameters.back();
parameters.type = ASTAlterQuery::RESHARD_PARTITION;
if (!first_partition.isNull())
parameters.partition = new ASTLiteral({}, first_partition);
if (!last_partition.isNull())
parameters.last_partition = new ASTLiteral({}, last_partition);
ASTPtr expr_list = new ASTExpressionList;
for (const auto & entry : weighted_zookeeper_paths)
try
{
ASTPtr weighted_path_ptr = new ASTWeightedZooKeeperPath;
auto & weighted_path = static_cast<ASTWeightedZooKeeperPath &>(*weighted_path_ptr);
weighted_path.path = entry.first;
weighted_path.weight = entry.second;
expr_list->children.push_back(weighted_path_ptr);
/// Создать запрос ALTER TABLE ... RESHARD PARTITION ... COORDINATE WITH ...
ASTPtr alter_query_ptr = new ASTAlterQuery;
auto & alter_query = static_cast<ASTAlterQuery &>(*alter_query_ptr);
alter_query.database = remote_database;
alter_query.table = remote_table;
alter_query.parameters.emplace_back();
ASTAlterQuery::Parameters & parameters = alter_query.parameters.back();
parameters.type = ASTAlterQuery::RESHARD_PARTITION;
if (!first_partition.isNull())
parameters.partition = new ASTLiteral({}, first_partition);
if (!last_partition.isNull())
parameters.last_partition = new ASTLiteral({}, last_partition);
ASTPtr expr_list = new ASTExpressionList;
for (const auto & entry : weighted_zookeeper_paths)
{
ASTPtr weighted_path_ptr = new ASTWeightedZooKeeperPath;
auto & weighted_path = static_cast<ASTWeightedZooKeeperPath &>(*weighted_path_ptr);
weighted_path.path = entry.first;
weighted_path.weight = entry.second;
expr_list->children.push_back(weighted_path_ptr);
}
parameters.weighted_zookeeper_paths = expr_list;
parameters.sharding_key_expr = sharding_key_expr;
parameters.coordinator = new ASTLiteral({}, coordinator_id);
resharding_worker.registerQuery(coordinator_id, queryToString(alter_query_ptr));
/// We enforce the strict policy under which resharding may start if and only if
/// all the required connections were established first.
std::atomic<size_t> effective_node_count{0};
ClusterProxy::PreSendHook::PreProcess pre_process = [&effective_node_count](const RemoteBlockInputStream * remote_stream)
{
size_t count = (remote_stream != nullptr) ? remote_stream->getConnectionCount() : 1;
effective_node_count += count;
};
size_t shard_count = cluster.getRemoteShardCount() + cluster.getLocalShardCount();
ClusterProxy::PreSendHook::PostProcess post_process = [&effective_node_count, shard_count, coordinator_id]()
{
if (effective_node_count != shard_count)
{
throw Exception("Unexpected number of nodes participating in distributed job "
+ coordinator_id, ErrorCodes::RESHARDING_INITIATOR_CHECK_FAILED);
}
};
ClusterProxy::PreSendHook pre_send_hook(pre_process, post_process);
ClusterProxy::AlterQueryConstructor alter_query_constructor;
alter_query_constructor.setPreSendHook(pre_send_hook);
/** Функциональность shard_multiplexing не доделана - выключаем её.
* (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.)
* Подробнее смотрите в https://███████████.yandex-team.ru/METR-18300
*/
bool enable_shard_multiplexing = false;
BlockInputStreams streams = ClusterProxy::Query(alter_query_constructor, cluster, alter_query_ptr,
context, settings, enable_shard_multiplexing).execute();
streams[0] = new UnionBlockInputStream<>(streams, nullptr, settings.max_distributed_connections);
streams.resize(1);
auto stream_ptr = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]);
if (stream_ptr == nullptr)
throw Exception("StorageDistributed: Internal error", ErrorCodes::LOGICAL_ERROR);
auto & stream = *stream_ptr;
while (!stream.isCancelled() && stream.read())
;
}
catch (...)
{
try
{
resharding_worker.deleteCoordinator(coordinator_id);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
parameters.weighted_zookeeper_paths = expr_list;
parameters.sharding_key_expr = sharding_key_expr;
/** Функциональность shard_multiplexing не доделана - выключаем её.
* (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.)
* Подробнее смотрите в https://███████████.yandex-team.ru/METR-18300
*/
bool enable_shard_multiplexing = false;
ClusterProxy::AlterQueryConstructor alter_query_constructor;
BlockInputStreams streams = ClusterProxy::Query(alter_query_constructor, cluster, alter_query_ptr,
context, settings, enable_shard_multiplexing).execute();
streams[0] = new UnionBlockInputStream<>(streams, nullptr, settings.max_distributed_connections);
streams.resize(1);
auto stream_ptr = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]);
if (stream_ptr == nullptr)
throw Exception("StorageDistributed: Internal error", ErrorCodes::LOGICAL_ERROR);
auto & stream = *stream_ptr;
while (!stream.isCancelled() && stream.read())
;
throw;
}
}
BlockInputStreams StorageDistributed::describe(const Context & context, const Settings & settings)

View File

@ -76,6 +76,10 @@ namespace ErrorCodes
extern const int RESHARDING_INVALID_PARAMETERS;
extern const int INVALID_SHARD_WEIGHT;
extern const int DUPLICATE_SHARD_PATHS;
extern const int RESHARDING_NO_SUCH_COORDINATOR;
extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP;
extern const int RESHARDING_ALREADY_SUBSCRIBED;
extern const int RESHARDING_INVALID_QUERY;
}
@ -193,8 +197,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_,
settings_, database_name_ + "." + table_name, true,
std::bind(&StorageReplicatedMergeTree::enqueuePartForCheck, this, std::placeholders::_1)),
reader(data), writer(data), merger(data), fetcher(data), shutdown_event(false),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
reader(data), writer(data), merger(data), fetcher(data), sharded_partition_uploader_client(*this),
shutdown_event(false), log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
@ -355,8 +359,8 @@ StoragePtr StorageReplicatedMergeTree::create(
}
{
InterserverIOEndpointPtr endpoint = new ShardedPartitionSender::Service(*res);
res->sharded_partition_sender_endpoint_holder = get_endpoint_holder(endpoint);
InterserverIOEndpointPtr endpoint = new ShardedPartitionUploader::Service(*res);
res->sharded_partition_uploader_endpoint_holder = get_endpoint_holder(endpoint);
}
{
@ -2355,8 +2359,8 @@ void StorageReplicatedMergeTree::shutdown()
disk_space_monitor_endpoint_holder = nullptr;
disk_space_monitor_client.cancel();
sharded_partition_sender_endpoint_holder = nullptr;
sharded_partition_sender_client.cancel();
sharded_partition_uploader_endpoint_holder = nullptr;
sharded_partition_uploader_client.cancel();
remote_query_executor_endpoint_holder = nullptr;
remote_query_executor_client.cancel();
@ -3449,77 +3453,234 @@ void StorageReplicatedMergeTree::freezePartition(const Field & partition, const
unreplicated_data->freezePartition(prefix);
}
void StorageReplicatedMergeTree::reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr,
void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & database_name,
const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths,
const ASTPtr & sharding_key_expr, const Field & coordinator,
const Settings & settings)
{
auto & resharding_worker = context.getReshardingWorker();
if (!resharding_worker.isStarted())
throw Exception("Resharding background thread is not running", ErrorCodes::RESHARDING_NO_WORKER);
for (const auto & weighted_path : weighted_zookeeper_paths)
bool has_coordinator = !coordinator.isNull();
std::string coordinator_id;
UInt64 block_number = 0;
if (has_coordinator)
{
UInt64 weight = weighted_path.second;
if (weight == 0)
throw Exception("Shard has invalid weight", ErrorCodes::INVALID_SHARD_WEIGHT);
coordinator_id = coordinator.get<const String &>();
block_number = resharding_worker.subscribe(coordinator_id, queryToString(query));
}
/// List of local partitions that need to be resharded.
ReshardingWorker::PartitionList partition_list;
/// The aforementioned list comprises:
/// - first, the list of partitions that are to be resharded on more than one
/// shard. Given any such partition, a job runs on each shard under the supervision
/// of a coordinator;
/// - second, the list of partitions that are to be resharded only on this shard.
/// The iterator below indicates the beginning of the list of these so-called
/// uncoordinated partitions.
ReshardingWorker::PartitionList::const_iterator uncoordinated_begin;
try
{
std::vector<std::string> all_paths;
all_paths.reserve(weighted_zookeeper_paths.size());
for (const auto & weighted_path : weighted_zookeeper_paths)
all_paths.push_back(weighted_path.first);
std::sort(all_paths.begin(), all_paths.end());
if (std::adjacent_find(all_paths.begin(), all_paths.end()) != all_paths.end())
throw Exception("Shard paths must be distinct", ErrorCodes::DUPLICATE_SHARD_PATHS);
{
UInt64 weight = weighted_path.second;
if (weight == 0)
throw Exception("Shard has invalid weight", ErrorCodes::INVALID_SHARD_WEIGHT);
}
{
std::vector<std::string> all_paths;
all_paths.reserve(weighted_zookeeper_paths.size());
for (const auto & weighted_path : weighted_zookeeper_paths)
all_paths.push_back(weighted_path.first);
std::sort(all_paths.begin(), all_paths.end());
if (std::adjacent_find(all_paths.begin(), all_paths.end()) != all_paths.end())
throw Exception("Shard paths must be distinct", ErrorCodes::DUPLICATE_SHARD_PATHS);
}
DayNum_t first_partition_num = !first_partition.isNull() ? MergeTreeData::getMonthDayNum(first_partition) : DayNum_t();
DayNum_t last_partition_num = !last_partition.isNull() ? MergeTreeData::getMonthDayNum(last_partition) : DayNum_t();
if (first_partition_num && last_partition_num)
{
if (first_partition_num > last_partition_num)
throw Exception("Invalid interval of partitions", ErrorCodes::INVALID_PARTITIONS_INTERVAL);
}
if (!first_partition_num && last_partition_num)
throw Exception("Received invalid parameters for resharding", ErrorCodes::RESHARDING_INVALID_PARAMETERS);
bool include_all = !first_partition_num;
/// Составить список локальных партиций, которые надо перешардировать.
std::set<std::string> unique_partition_list;
const MergeTreeData::DataParts & data_parts = data.getDataParts();
for (MergeTreeData::DataParts::iterator it = data_parts.cbegin(); it != data_parts.cend(); ++it)
{
const MergeTreeData::DataPartPtr & current_part = *it;
DayNum_t month = current_part->month;
if (include_all || ((month >= first_partition_num) && (month <= last_partition_num)))
unique_partition_list.insert(MergeTreeData::getMonthName(month));
}
partition_list.assign(unique_partition_list.begin(), unique_partition_list.end());
if (partition_list.empty())
{
if (!has_coordinator)
throw Exception("No existing partition found", ErrorCodes::PARTITION_DOESNT_EXIST);
}
else
{
/// Убедиться, что структуры локальной и реплицируемых таблиц совпадают.
enforceShardsConsistency(weighted_zookeeper_paths);
/// Проверить, что для всех задач имеется достаточно свободного места локально и на всех репликах.
auto replica_to_space_info = gatherReplicaSpaceInfo(weighted_zookeeper_paths);
for (const auto & partition : partition_list)
{
size_t partition_size = data.getPartitionSize(partition);
if (!checkSpaceForResharding(replica_to_space_info, partition_size))
throw Exception("Insufficient space available for resharding operation "
"on partition " + partition, ErrorCodes::INSUFFICIENT_SPACE_FOR_RESHARDING);
}
}
if (has_coordinator)
{
size_t old_node_count = resharding_worker.getNodeCount(coordinator_id);
resharding_worker.addPartitions(coordinator_id, partition_list);
resharding_worker.waitForCheckCompletion(coordinator_id);
/// At this point, all the participating nodes know exactly the number
/// of partitions that are to be processed.
auto count = resharding_worker.getPartitionCount(coordinator_id);
if (count == 0)
throw Exception("No existing partition found", ErrorCodes::PARTITION_DOESNT_EXIST);
if (partition_list.empty())
{
/// We have no partitions, so we opt out.
resharding_worker.unsubscribe(coordinator_id);
}
resharding_worker.waitForOptOutCompletion(coordinator_id, old_node_count);
/// At this point, all the participating nodes that actually have some
/// partitions are in a coherent state.
if (partition_list.empty())
return;
if (resharding_worker.getNodeCount(coordinator_id) == 1)
{
/// Degenerate case: we are the only participating node.
/// All our jobs are uncoordinated.
resharding_worker.deleteCoordinator(coordinator_id);
uncoordinated_begin = partition_list.cbegin();
}
else
{
/// Split the list of partitions into a list of coordinated jobs
/// and a list of uncoordinated jobs.
uncoordinated_begin = resharding_worker.categorizePartitions(coordinator_id, partition_list);
}
if (uncoordinated_begin == partition_list.cbegin())
{
coordinator_id.clear();
has_coordinator = false;
}
}
else
{
/// All our jobs are uncoordinated.
uncoordinated_begin = partition_list.cbegin();
}
/// First, submit coordinated background resharding jobs.
for (auto it = partition_list.cbegin(); it != uncoordinated_begin; ++it)
{
ReshardingJob job;
job.database_name = database_name;
job.table_name = getTableName();
job.partition = *it;
job.paths = weighted_zookeeper_paths;
job.sharding_key_expr = sharding_key_expr;
job.coordinator_id = coordinator_id;
job.block_number = block_number;
resharding_worker.submitJob(job);
}
/// Then, submit uncoordinated background resharding jobs.
for (auto it = uncoordinated_begin; it != partition_list.cend(); ++it)
{
ReshardingJob job;
job.database_name = database_name;
job.table_name = getTableName();
job.partition = *it;
job.paths = weighted_zookeeper_paths;
job.sharding_key_expr = sharding_key_expr;
resharding_worker.submitJob(job);
}
}
DayNum_t first_partition_num = !first_partition.isNull() ? MergeTreeData::getMonthDayNum(first_partition) : DayNum_t();
DayNum_t last_partition_num = !last_partition.isNull() ? MergeTreeData::getMonthDayNum(last_partition) : DayNum_t();
if (first_partition_num && last_partition_num)
catch (const Exception & ex)
{
if (first_partition_num > last_partition_num)
throw Exception("Invalid interval of partitions", ErrorCodes::INVALID_PARTITIONS_INTERVAL);
if (has_coordinator)
{
if ((ex.code() == ErrorCodes::RESHARDING_NO_SUCH_COORDINATOR) ||
(ex.code() == ErrorCodes::RESHARDING_NO_COORDINATOR_MEMBERSHIP) ||
(ex.code() == ErrorCodes::RESHARDING_ALREADY_SUBSCRIBED) ||
(ex.code() == ErrorCodes::RESHARDING_INVALID_QUERY))
{
/// Theoretically any of these errors may have occurred because a user
/// has willfully attempted to botch an ongoing distributed resharding job.
/// Consequently we don't take them into account.
}
else
{
try
{
/// Before jobs are submitted, errors and cancellations are both
/// considered as errors.
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
throw;
}
if (!first_partition_num && last_partition_num)
throw Exception("Received invalid parameters for resharding", ErrorCodes::RESHARDING_INVALID_PARAMETERS);
bool include_all = !first_partition_num;
/// Составить список локальных партиций, которые надо перешардировать.
using PartitionList = std::set<std::string>;
PartitionList partition_list;
const MergeTreeData::DataParts & data_parts = data.getDataParts();
for (MergeTreeData::DataParts::iterator it = data_parts.cbegin(); it != data_parts.cend(); ++it)
catch (...)
{
const MergeTreeData::DataPartPtr & current_part = *it;
DayNum_t month = current_part->month;
if (include_all || ((month >= first_partition_num) && (month <= last_partition_num)))
partition_list.insert(MergeTreeData::getMonthName(month));
if (has_coordinator)
{
try
{
/// Before jobs are submitted, errors and cancellations are both
/// considered as errors.
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
throw;
}
if (partition_list.empty())
throw Exception("No existing partition found", ErrorCodes::PARTITION_DOESNT_EXIST);
/// Убедиться, что структуры локальной и реплицируемых таблиц совпадают.
enforceShardsConsistency(weighted_zookeeper_paths);
/// Проверить, что для всех задач имеется достаточно свободного места локально и на всех репликах.
auto replica_to_space_info = gatherReplicaSpaceInfo(weighted_zookeeper_paths);
for (const auto & partition : partition_list)
{
size_t partition_size = data.getPartitionSize(partition);
if (!checkSpaceForResharding(replica_to_space_info, partition_size))
throw Exception("Insufficient space available for resharding operation "
"on partition " + partition, ErrorCodes::INSUFFICIENT_SPACE_FOR_RESHARDING);
}
/// Зарегистрировать фоновые задачи перешардирования.
for (const auto & partition : partition_list)
resharding_worker.submitJob(database_name, getTableName(), partition, weighted_zookeeper_paths, sharding_key_expr);
}
void StorageReplicatedMergeTree::enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths)
@ -3537,7 +3698,7 @@ void StorageReplicatedMergeTree::enforceShardsConsistency(const WeightedZooKeepe
!std::equal(materialized_columns.begin(), materialized_columns.end(), columns_desc.materialized.begin()) ||
!std::equal(alias_columns.begin(), alias_columns.end(), columns_desc.alias.begin()) ||
!std::equal(column_defaults.begin(), column_defaults.end(), columns_desc.defaults.begin()))
throw Exception("Table is inconsistent accross shards", ErrorCodes::INCONSISTENT_TABLE_ACCROSS_SHARDS);
throw Exception("Table is inconsistent accross shards", ErrorCodes::INCONSISTENT_TABLE_ACCROSS_SHARDS);
}
}

View File

@ -54,7 +54,7 @@ fi
# Тесты, зависящие от данных Метрики - не публикуются наружу. Создаём симлинки на внутренний репозиторий. Весьма неудобно.
QUERIES_PRIVATE_DIR="../../../ClickHouse_private/tests/${QUERIES_DIR}"
QUERIES_PRIVATE_DIR="../../../private/tests/${QUERIES_DIR}"
if [ -d "$QUERIES_PRIVATE_DIR" ]; then
for dir in $(ls $QUERIES_PRIVATE_DIR)
do

2
debian/changelog.in vendored
View File

@ -1,4 +1,4 @@
metrika-yandex (0.0.@REVISION@) unstable; urgency=low
clickhouse (0.0.@REVISION@) unstable; urgency=low
* Modified source code

View File

@ -2,7 +2,7 @@ SET(REVISIONFILE ${CMAKE_CURRENT_BINARY_DIR}/src/revision.h)
ADD_CUSTOM_COMMAND(
OUTPUT ${REVISIONFILE}
COMMAND bash -f ${CMAKE_CURRENT_SOURCE_DIR}/src/create_revision.sh ${REVISIONFILE}
COMMAND bash -f -v -x ${CMAKE_CURRENT_SOURCE_DIR}/src/create_revision.sh ${REVISIONFILE}
)
set_source_files_properties(

View File

@ -351,6 +351,7 @@ JSON::Pos JSON::skipArray() const
{
case ',':
++pos;
break;
case ']':
return ++pos;
default:
@ -382,6 +383,7 @@ JSON::Pos JSON::skipObject() const
{
case ',':
++pos;
break;
case '}':
return ++pos;
default:

View File

@ -7,13 +7,6 @@ if [[ $# -ne 1 ]]; then
exit 1
fi
out_file=$1
dir=$(dirname $out_file)
mkdir -p $dir
echo "#ifndef REVISION" > $out_file
echo -n "#define REVISION " >> $out_file
# для stash выставляем жестко ревизию
if [ -z "$(git config --get remote.origin.url | grep github)" ];
then
@ -29,5 +22,11 @@ else
fi
fi
echo $revision >> $out_file
echo "#endif" >> $out_file
out_file=$1
dir=$(dirname $out_file)
mkdir -p $dir
echo "#ifndef REVISION
#define REVISION $revision
#endif" > $out_file

View File

@ -1,12 +1,18 @@
add_library(zkutil
src/ZooKeeper.cpp
src/Lock.cpp
src/SingleBarrier.cpp
src/Barrier.cpp
src/RWLock.cpp
src/ZooKeeperHolder.cpp
include/zkutil/Increment.h
include/zkutil/LeaderElection.h
include/zkutil/KeeperException.h
include/zkutil/Lock.h
include/zkutil/SingleBarrier.h
include/zkutil/Barrier.h
include/zkutil/RWLock.h
include/zkutil/ZooKeeper.h
include/zkutil/Types.h
include/zkutil/ZooKeeperHolder.h)

View File

@ -0,0 +1,43 @@
#pragma once
#include <zkutil/ZooKeeperHolder.h>
#include <string>
#include <functional>
namespace zkutil
{
/** Double distributed barrier for ZooKeeper.
*/
class Barrier final
{
public:
using CancellationHook = std::function<void()>;
public:
Barrier(ZooKeeperPtr zookeeper_, const std::string & path_, size_t counter_);
Barrier(const Barrier &) = delete;
Barrier & operator=(const Barrier &) = delete;
Barrier(Barrier &&) = default;
Barrier & operator=(Barrier &&) = default;
/// Register a function that checks whether barrier operation should be cancelled.
void setCancellationHook(CancellationHook cancellation_hook_);
void enter(uint64_t timeout = 0);
void leave(uint64_t timeout = 0);
private:
void abortIfRequested();
private:
ZooKeeperPtr zookeeper;
EventPtr event = new Poco::Event;
CancellationHook cancellation_hook;
std::string path;
size_t counter;
};
}

View File

@ -0,0 +1,119 @@
#pragma once
#include <zkutil/ZooKeeper.h>
#include <DB/Common/Exception.h>
#include <Poco/Event.h>
#include <string>
#include <type_traits>
#include <functional>
namespace zkutil
{
/** Distributed read/write lock for ZooKeeper.
* Such a RWLock object may not be shared among threads.
*/
class RWLock final
{
public:
enum Type
{
Read = 0,
Write
};
enum Mode
{
Blocking = 0,
NonBlocking
};
using CancellationHook = std::function<void()>;
public:
/// Create under the specified ZooKeeper path a queue for lock requests
/// if it doesn't exist yet.
RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_);
RWLock(const RWLock &) = delete;
RWLock & operator=(const RWLock &) = delete;
RWLock(RWLock &&) = default;
RWLock & operator=(RWLock &&) = default;
/// Register a function that checks whether lock acquisition should be cancelled.
void setCancellationHook(CancellationHook cancellation_hook_);
/// Get a read lock.
void acquireRead(RWLock::Mode mode);
/// Get a write lock.
void acquireWrite(RWLock::Mode mode);
/// Check whether we have acquired the lock.
bool ownsLock() const;
/// Release lock.
void release();
public:
template <Type, Mode = Blocking> class Guard;
private:
template <RWLock::Type lock_type> void acquireImpl(RWLock::Mode mode);
void abortIfRequested();
private:
ZooKeeperPtr zookeeper;
EventPtr event = new Poco::Event;
CancellationHook cancellation_hook;
/// Path to the lock request queue.
std::string path;
/// Identifier of our request for a lock.
std::string key;
bool owns_lock = false;
};
/** Scoped version of RWLock.
*/
template <RWLock::Type lock_type, RWLock::Mode lock_mode>
class RWLock::Guard final
{
static_assert((lock_type == RWLock::Read) || (lock_type == RWLock::Write), "Invalid RWLock type");
static_assert((lock_mode == RWLock::Blocking) || (lock_mode == RWLock::NonBlocking), "Invalid RWLock mode");
public:
/// Acquire lock.
Guard(RWLock & rw_lock_)
: rw_lock(rw_lock_)
{
if (lock_type == RWLock::Read)
rw_lock.acquireRead(lock_mode);
else if (lock_type == RWLock::Write)
rw_lock.acquireWrite(lock_mode);
}
/// Release lock.
~Guard()
{
if (rw_lock.ownsLock())
{
try
{
rw_lock.release();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
Guard(const Guard &) = delete;
Guard & operator=(const Guard &) = delete;
private:
RWLock & rw_lock;
};
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <zkutil/ZooKeeperHolder.h>
#include <string>
#include <functional>
namespace zkutil
{
/** Single distributed barrier for ZooKeeper.
*/
class SingleBarrier final
{
public:
using CancellationHook = std::function<void()>;
public:
SingleBarrier(ZooKeeperPtr zookeeper_, const std::string & path_, size_t counter_);
SingleBarrier(const SingleBarrier &) = delete;
SingleBarrier & operator=(const SingleBarrier &) = delete;
SingleBarrier(SingleBarrier &&) = default;
SingleBarrier & operator=(SingleBarrier &&) = default;
/// Register a function that checks whether barrier operation should be cancelled.
void setCancellationHook(CancellationHook cancellation_hook_);
void enter(uint64_t timeout = 0);
private:
void abortIfRequested();
private:
ZooKeeperPtr zookeeper;
EventPtr event = new Poco::Event;
CancellationHook cancellation_hook;
std::string path;
size_t counter;
};
}

View File

@ -0,0 +1,132 @@
#include <zkutil/Barrier.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Common/Exception.h>
#include <Poco/Stopwatch.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BARRIER_TIMEOUT;
}
}
namespace zkutil
{
namespace
{
constexpr long wait_duration = 1000;
}
Barrier::Barrier(ZooKeeperPtr zookeeper_, const std::string & path_, size_t counter_)
: zookeeper(zookeeper_), path(path_), counter(counter_)
{
int32_t code = zookeeper->tryCreate(path, "", CreateMode::Persistent);
if ((code != ZOK) && (code != ZNODEEXISTS))
throw KeeperException(code);
}
void Barrier::setCancellationHook(CancellationHook cancellation_hook_)
{
cancellation_hook = cancellation_hook_;
}
void Barrier::enter(uint64_t timeout)
{
__sync_synchronize();
auto key = zookeeper->create(path + "/" + getFQDNOrHostName(), "", zkutil::CreateMode::Ephemeral);
key = key.substr(path.length() + 1);
Poco::Stopwatch watch;
if (timeout > 0)
watch.start();
while (true)
{
auto children = zookeeper->getChildren(path, nullptr, event);
std::sort(children.begin(), children.end());
auto it = std::lower_bound(children.cbegin(), children.cend(), key);
/// This should never happen.
if ((it == children.cend()) || (*it != key))
throw DB::Exception("Barrier: corrupted queue. Own request not found.",
DB::ErrorCodes::LOGICAL_ERROR);
if (children.size() == counter)
break;
do
{
if (static_cast<uint32_t>(watch.elapsedSeconds()) > timeout)
throw DB::Exception("Barrier: timeout", DB::ErrorCodes::BARRIER_TIMEOUT);
abortIfRequested();
}
while (!event->tryWait(wait_duration));
}
}
void Barrier::leave(uint64_t timeout)
{
__sync_synchronize();
zookeeper->remove(path + "/" + getFQDNOrHostName());
Poco::Stopwatch watch;
if (timeout > 0)
watch.start();
while (true)
{
auto children = zookeeper->getChildren(path, nullptr, event);
if (children.empty())
break;
do
{
if (static_cast<uint32_t>(watch.elapsedSeconds()) > timeout)
throw DB::Exception("Barrier: timeout", DB::ErrorCodes::BARRIER_TIMEOUT);
abortIfRequested();
}
while (!event->tryWait(wait_duration));
}
}
void Barrier::abortIfRequested()
{
if (cancellation_hook)
{
try
{
cancellation_hook();
}
catch (...)
{
try
{
event->reset();
zookeeper->tryRemove(path + "/" + getFQDNOrHostName());
}
catch (...)
{
}
throw;
}
}
}
}

View File

@ -0,0 +1,198 @@
#include <zkutil/RWLock.h>
#include <algorithm>
#include <iterator>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int RWLOCK_ALREADY_HELD;
extern const int ABORTED;
}
}
namespace zkutil
{
namespace
{
constexpr long wait_duration = 1000;
constexpr auto prefix_length = 2;
template <RWLock::Type>
struct Prefix;
template <>
struct Prefix<RWLock::Read>
{
static constexpr auto name = "R-";
};
template <>
struct Prefix<RWLock::Write>
{
static constexpr auto name = "W-";
};
inline bool nodeQueueCmp(const std::string & lhs, const std::string & rhs)
{
return lhs.compare(prefix_length, std::string::npos, rhs, prefix_length, std::string::npos) < 0;
};
inline bool nodeQueueEquals(const std::string & lhs, const std::string & rhs)
{
return lhs.compare(prefix_length, std::string::npos, rhs, prefix_length, std::string::npos) == 0;
};
}
RWLock::RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_)
: zookeeper(zookeeper_), path(path_)
{
int32_t code = zookeeper->tryCreate(path, "", CreateMode::Persistent);
if ((code != ZOK) && (code != ZNODEEXISTS))
throw KeeperException(code);
}
void RWLock::setCancellationHook(CancellationHook cancellation_hook_)
{
cancellation_hook = cancellation_hook_;
}
void RWLock::acquireRead(Mode mode)
{
acquireImpl<RWLock::Read>(mode);
}
void RWLock::acquireWrite(Mode mode)
{
acquireImpl<RWLock::Write>(mode);
}
void RWLock::release()
{
__sync_synchronize();
if (key.empty())
throw DB::Exception("RWLock: no lock is held", DB::ErrorCodes::LOGICAL_ERROR);
zookeeper->remove(path + "/" + key);
key.clear();
}
template <typename RWLock::Type lock_type>
void RWLock::acquireImpl(Mode mode)
{
static_assert((lock_type == RWLock::Read) || (lock_type == RWLock::Write), "Invalid RWLock type");
__sync_synchronize();
if (!key.empty())
throw DB::Exception("RWLock: lock already held", DB::ErrorCodes::RWLOCK_ALREADY_HELD);
try
{
/// Enqueue a new request for a lock.
key = zookeeper->create(path + "/" + Prefix<lock_type>::name,
"", CreateMode::EphemeralSequential);
key = key.substr(path.length() + 1);
while (true)
{
auto children = zookeeper->getChildren(path);
std::sort(children.begin(), children.end(), nodeQueueCmp);
auto it = std::lower_bound(children.cbegin(), children.cend(), key, nodeQueueCmp);
/// This should never happen.
if ((it == children.cend()) || !nodeQueueEquals(*it, key))
throw DB::Exception("RWLock: corrupted lock request queue. Own request not found.",
DB::ErrorCodes::LOGICAL_ERROR);
const std::string * observed_key = nullptr;
if (lock_type == RWLock::Read)
{
/// Look for the first write lock request that is older than us.
auto it2 = std::find_if(
std::make_reverse_iterator(it),
children.crend(),
[](const std::string & child)
{
return (child.length() >= prefix_length) &&
(child.compare(0, prefix_length, Prefix<RWLock::Write>::name) == 0);
});
if (it2 != children.crend())
observed_key = &*it2;
}
else if (lock_type == RWLock::Write)
{
if (it != children.cbegin())
{
/// Take the next lock request that is older than us.
auto it2 = std::prev(it);
observed_key = &*it2;
}
}
if (observed_key == nullptr)
{
/// We hold the lock.
owns_lock = true;
break;
}
if (mode == NonBlocking)
{
zookeeper->remove(path + "/" + key);
key.clear();
break;
}
abortIfRequested();
/// Wait for our turn to come.
if (zookeeper->exists(path + "/" + *observed_key, nullptr, event))
{
do
{
abortIfRequested();
}
while (!event->tryWait(wait_duration));
}
}
}
catch (...)
{
try
{
if (!key.empty())
zookeeper->remove(path + "/" + key);
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw;
}
}
bool RWLock::ownsLock() const
{
return owns_lock;
}
void RWLock::abortIfRequested()
{
if (cancellation_hook)
cancellation_hook();
}
}

View File

@ -0,0 +1,103 @@
#include <zkutil/SingleBarrier.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Common/Exception.h>
#include <Poco/Stopwatch.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BARRIER_TIMEOUT;
}
}
namespace zkutil
{
namespace
{
constexpr long wait_duration = 1000;
}
SingleBarrier::SingleBarrier(ZooKeeperPtr zookeeper_, const std::string & path_, size_t counter_)
: zookeeper(zookeeper_), path(path_), counter(counter_)
{
int32_t code = zookeeper->tryCreate(path, "", CreateMode::Persistent);
if ((code != ZOK) && (code != ZNODEEXISTS))
throw KeeperException(code);
}
void SingleBarrier::setCancellationHook(CancellationHook cancellation_hook_)
{
cancellation_hook = cancellation_hook_;
}
void SingleBarrier::enter(uint64_t timeout)
{
__sync_synchronize();
auto key = zookeeper->create(path + "/" + getFQDNOrHostName(), "", zkutil::CreateMode::Ephemeral);
key = key.substr(path.length() + 1);
Poco::Stopwatch watch;
if (timeout > 0)
watch.start();
while (true)
{
auto children = zookeeper->getChildren(path, nullptr, event);
std::sort(children.begin(), children.end());
auto it = std::lower_bound(children.cbegin(), children.cend(), key);
/// This should never happen.
if ((it == children.cend()) || (*it != key))
throw DB::Exception("SingleBarrier: corrupted queue. Own request not found.",
DB::ErrorCodes::LOGICAL_ERROR);
if (children.size() == counter)
break;
do
{
if (static_cast<uint32_t>(watch.elapsedSeconds()) > timeout)
throw DB::Exception("SingleBarrier: timeout", DB::ErrorCodes::BARRIER_TIMEOUT);
abortIfRequested();
}
while (!event->tryWait(wait_duration));
}
}
void SingleBarrier::abortIfRequested()
{
if (cancellation_hook)
{
try
{
cancellation_hook();
}
catch (...)
{
try
{
event->reset();
zookeeper->tryRemove(path + "/" + getFQDNOrHostName());
}
catch (...)
{
}
throw;
}
}
}
}

1
private Submodule

@ -0,0 +1 @@
Subproject commit f0dd35eadabb7e7c9aa2ab203d54abed693e9b80

View File

@ -6,6 +6,9 @@ CONTROL=debian/control
CHLOG=debian/changelog
CHDATE=$(LC_ALL=C date -R | sed -e 's/,/\\,/g') # Заменим запятую на '\,'
# Собирать пакет с конфигурационными файлами для Яндекс.Метрики.
BUILD_PACKAGE_FOR_METRIKA=$([ -f 'private/Server/metrika/config.xml' ] && echo 'yes')
# Список демонов для сборки может быть указан в аргументах командной строки.
if [ $# -gt 0 ]
then

View File

@ -37,7 +37,7 @@ function make_control {
case "$DAEMON_PKG" in
'clickhouse-server' )
add_daemon_impl clickhouse-server-base '' 'clickhouse-server binary'
add_daemon_impl clickhouse-server-metrika "clickhouse-server-base(=0.0.$REVISION)" 'Configuration files specific for Metrika project for clickhouse-server-base package'
[ -n "$BUILD_PACKAGE_FOR_METRIKA" ] && add_daemon_impl clickhouse-server-metrika "clickhouse-server-base(=0.0.$REVISION)" 'Configuration files specific for Metrika project for clickhouse-server-base package'
add_daemon_impl clickhouse-server-common "clickhouse-server-base(=0.0.$REVISION)" 'Common configuration files for clickhouse-server-base package'
;;
'clickhouse-client' )