This commit is contained in:
Michael Kolupaev 2014-04-07 23:10:14 +04:00
commit 7d5429655a
38 changed files with 2733 additions and 293 deletions

View File

@ -26,6 +26,10 @@
M(FunctionExecute, "Function executes") \
M(MarkCacheHits, "Mark cache hits") \
M(MarkCacheMisses, "Mark cache misses") \
M(ReplicatedPartFetches, "Replicated part fetches") \
M(ObsoleteReplicatedParts, "Replicated parts rendered obsolete by fetches") \
M(ReplicatedPartMerges, "Replicated part merges") \
M(ReplicatedPartFetchesOfMerged, "Replicated part merges replaced with fetches") \
\
M(END, "")

View File

@ -227,10 +227,23 @@ namespace ErrorCodes
CLIENT_HAS_CONNECTED_TO_WRONG_PORT,
TABLE_IS_DROPPED,
DATABASE_NOT_EMPTY,
DUPLICATE_INTERSERVER_IO_ENDPOINT,
NO_SUCH_INTERSERVER_IO_ENDPOINT,
ADDING_REPLICA_TO_NON_EMPTY_TABLE,
UNEXPECTED_AST_STRUCTURE,
REPLICA_IS_ALREADY_ACTIVE,
NO_ZOOKEEPER,
NO_FILE_IN_DATA_PART,
UNEXPECTED_FILE_IN_DATA_PART,
BAD_SIZE_OF_FILE_IN_DATA_PART,
QUERY_IS_TOO_LARGE,
NOT_FOUND_EXPECTED_DATA_PART,
TOO_MANY_UNEXPECTED_DATA_PARTS,
NO_SUCH_DATA_PART,
BAD_DATA_PART_NAME,
NO_REPLICA_HAS_PART,
DUPLICATE_DATA_PART,
ABORTED,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -17,7 +17,7 @@ public:
CollapsingFinalBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_)
: description(description_), sign_column(sign_column_),
log(&Logger::get("CollapsingSortedBlockInputStream")),
log(&Logger::get("CollapsingFinalBlockInputStream")),
first(true), count_positive(0), count_negative(0), count_incorrect_data(0), blocks_fetched(0), blocks_output(0)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());

View File

@ -0,0 +1,96 @@
#pragma once
#include <Poco/URI.h>
#include <Poco/SharedPtr.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPClientSession.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <Yandex/logger_useful.h>
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
namespace DB
{
/** Делает указанный HTTP-запрос и отдает ответ.
*/
class ReadBufferFromHTTP : public ReadBuffer
{
private:
std::string host;
int port;
Poco::Net::HTTPClientSession session;
std::istream * istr; /// этим владеет session
Poco::SharedPtr<ReadBufferFromIStream> impl;
public:
typedef std::vector<std::pair<String, String> > Params;
ReadBufferFromHTTP(
const std::string & host_,
int port_,
const Params & params,
size_t timeout_ = 0,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(NULL, 0), host(host_), port(port_)
{
std::stringstream uri;
uri << "http://" << host << ":" << port << "/";
bool first = true;
for (const auto & it : params)
{
uri << (first ? "?" : "&");
first = false;
String encoded_key;
String encoded_value;
Poco::URI::encode(it.first, "=&#", encoded_key);
Poco::URI::encode(it.second, "&#", encoded_value);
uri << encoded_key << "=" << encoded_value;
}
session.setHost(host);
session.setPort(port);
/// устанавливаем таймаут
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0));
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str());
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadBufferFromHTTP")), "Sending request to " << uri.str());
session.sendRequest(request);
istr = &session.receiveResponse(response);
Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
impl = new ReadBufferFromIStream(*istr, buffer_size_);
}
bool nextImpl()
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
};
}

View File

@ -18,6 +18,7 @@
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/VarInt.h>
#include <city.h>
#define DEFAULT_MAX_STRING_SIZE 0x00FFFFFFULL
@ -116,6 +117,11 @@ inline void readChar(char & x, ReadBuffer & buf)
void assertString(const char * s, ReadBuffer & buf);
void assertEOF(ReadBuffer & buf);
inline void assertString(const String & s, ReadBuffer & buf)
{
assertString(s.c_str(), buf);
}
inline void readBoolText(bool & x, ReadBuffer & buf)
{
@ -412,6 +418,7 @@ inline void readBinary(Float32 & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(Float64 & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(String & x, ReadBuffer & buf) { readStringBinary(x, buf); }
inline void readBinary(bool & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(uint128 & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(VisitID_t & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(mysqlxx::Date & x, ReadBuffer & buf) { readPODBinary(x, buf); }

View File

@ -1,78 +1,34 @@
#pragma once
#include <Poco/URI.h>
#include <Poco/SharedPtr.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPClientSession.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <Yandex/logger_useful.h>
#define DEFAULT_REMOTE_READ_BUFFER_TIMEOUT 1800
#include <DB/IO/ReadBufferFromHTTP.h>
#include "ReadHelpers.h"
namespace DB
{
/** Позволяет читать файл с удалённого сервера.
/** Позволяет читать файл с удалённого сервера через riod.
*/
class RemoteReadBuffer : public ReadBuffer
{
private:
std::string host;
int port;
std::string path;
bool compress;
Poco::Net::HTTPClientSession session;
std::istream * istr; /// этим владеет session
Poco::SharedPtr<ReadBufferFromIStream> impl;
Poco::SharedPtr<ReadBufferFromHTTP> impl;
public:
RemoteReadBuffer(
const std::string & host_,
int port_,
const std::string & path_,
bool compress_ = true,
size_t timeout_ = 0,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(NULL, 0), host(host_), port(port_), path(path_), compress(compress_)
const std::string & host,
int port,
const std::string & path,
bool compress = true,
size_t timeout = 0,
size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE)
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
std::stringstream uri;
uri << "http://" << host << ":" << port << "/?action=read&path=" << encoded_path << "&compress=" << (compress ? "true" : "false");
ReadBufferFromHTTP::Params params = {
std::make_pair("action", "read"),
std::make_pair("path", path),
std::make_pair("compress", (compress ? "true" : "false"))};
session.setHost(host);
session.setPort(port);
/// устанавливаем таймаут
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0));
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str());
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("RemoteReadBuffer")), "Sending request to " << uri.str());
session.sendRequest(request);
istr = &session.receiveResponse(response);
Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
impl = new ReadBufferFromIStream(*istr, buffer_size_);
impl = new ReadBufferFromHTTP(host, port, params, timeout, buffer_size);
}
bool nextImpl()
@ -89,42 +45,22 @@ public:
const std::string & host,
int port,
const std::string & path,
size_t timeout_ = 0)
size_t timeout = 0)
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
ReadBufferFromHTTP::Params params = {
std::make_pair("action", "list"),
std::make_pair("path", path)};
std::stringstream uri;
uri << "http://" << host << ":" << port << "/?action=list&path=" << encoded_path;
Poco::Net::HTTPClientSession session;
session.setHost(host);
session.setPort(port);
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0));
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str());
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("RemoteReadBuffer")), "Sending request to " << uri.str());
session.sendRequest(request);
std::istream * istr = &session.receiveResponse(response);
Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
ReadBufferFromHTTP in(host, port, params, timeout);
std::vector<std::string> files;
std::string s;
while (getline(*istr, s, '\n') && !s.empty())
while (!in.eof())
{
std::string s;
readString(s, in);
skipWhitespaceIfAny(in);
files.push_back(s);
}
return files;
}

View File

@ -18,6 +18,7 @@
#include <DB/IO/WriteIntText.h>
#include <DB/IO/VarInt.h>
#include <DB/IO/WriteBufferFromString.h>
#include <city.h>
#define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6U
@ -448,6 +449,7 @@ inline void writeBinary(const Float32 & x, WriteBuffer & buf) { writePODBinary(
inline void writeBinary(const Float64 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const String & x, WriteBuffer & buf) { writeStringBinary(x, buf); }
inline void writeBinary(const bool & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const uint128 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const VisitID_t & x, WriteBuffer & buf) { writePODBinary(static_cast<const UInt64 &>(x), buf); }
inline void writeBinary(const mysqlxx::Date & x, WriteBuffer & buf) { writePODBinary(x, buf); }

View File

@ -12,6 +12,10 @@ namespace DB
*/
void copyData(ReadBuffer & from, WriteBuffer & to);
/** Копирует bytes байт из ReadBuffer в WriteBuffer
*/
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes);
}
#endif

View File

@ -24,8 +24,10 @@
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/Client/ConnectionPool.h>
#include <statdaemons/ConfigProcessor.h>
#include <zkutil/ZooKeeper.h>
namespace DB
@ -70,6 +72,11 @@ struct ContextShared
mutable Poco::Mutex mutex; /// Для доступа и модификации разделяемых объектов.
mutable SharedPtr<zkutil::ZooKeeper> zookeeper; /// Клиент для ZooKeeper.
String interserver_io_host; /// Имя хоста по которым это сервер доступен для других серверов.
int interserver_io_port; /// и порт,
String path; /// Путь к директории с данными, со слешем на конце.
Databases databases; /// Список БД и таблиц в них.
TableFunctionFactory table_function_factory; /// Табличные функции.
@ -86,6 +93,7 @@ struct ContextShared
ProcessList process_list; /// Исполняющиеся в данный момент запросы.
ViewDependencies view_dependencies; /// Текущие зависимости
ConfigurationPtr users_config; /// Конфиг с секциями users, profiles и quotas.
InterserverIOHandler interserver_io_handler; /// Обработчик для межсерверной передачи данных.
/// Кластеры для distributed таблиц
/// Создаются при создании Distributed таблиц, так как нужно дождаться пока будут выставлены Settings
@ -248,6 +256,13 @@ public:
const FormatFactory & getFormatFactory() const { return shared->format_factory; }
const Dictionaries & getDictionaries() const;
InterserverIOHandler & getInterserverIOHandler() { return shared->interserver_io_handler; }
/// Как другие серверы могут обратиться к этому.
void setInterserverIOHost(const String & host, int port);
String getInterserverIOHost() const;
int getInterserverIOPort() const;
/// Получить запрос на CREATE таблицы.
ASTPtr getCreateQuery(const String & database_name, const String & table_name) const;
@ -291,6 +306,9 @@ public:
void setUncompressedCache(size_t max_size_in_bytes);
UncompressedCachePtr getUncompressedCache() const;
void setZooKeeper(SharedPtr<zkutil::ZooKeeper> zookeeper);
zkutil::ZooKeeper & getZooKeeper() const;
/// Создать кэш засечек указанного размера. Это можно сделать только один раз.
void setMarkCache(size_t cache_size_in_bytes);
MarkCachePtr getMarkCache() const;

View File

@ -0,0 +1,97 @@
#pragma once
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/Core/Types.h>
#include <map>
#include <Poco/Net/HTMLForm.h>
namespace DB
{
/** Обработчик запросов от других серверов.
*/
class InterserverIOEndpoint
{
public:
virtual void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) = 0;
virtual ~InterserverIOEndpoint() {}
};
typedef Poco::SharedPtr<InterserverIOEndpoint> InterserverIOEndpointPtr;
/** Сюда можно зарегистрировать сервис, обрататывающий запросы от других серверов.
* Используется для передачи кусков в ReplicatedMergeTree.
*/
class InterserverIOHandler
{
public:
void addEndpoint(const String & name, InterserverIOEndpointPtr endpoint)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (endpoint_map.count(name))
throw Exception("Duplicate interserver IO endpoint: " + name, ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT);
endpoint_map[name] = endpoint;
}
void removeEndpoint(const String & name)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (!endpoint_map.count(name))
throw Exception("No interserver IO endpoint named " + name, ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT);
endpoint_map.erase(name);
}
InterserverIOEndpointPtr getEndpoint(const String & name)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (!endpoint_map.count(name))
throw Exception("No interserver IO endpoint named " + name, ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT);
return endpoint_map[name];
}
private:
typedef std::map<String, InterserverIOEndpointPtr> EndpointMap;
EndpointMap endpoint_map;
Poco::FastMutex mutex;
};
/// В конструкторе вызывает addEndpoint, в деструкторе - removeEndpoint.
class InterserverIOEndpointHolder
{
public:
InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_)
: name(name_), endpoint(endpoint_), handler(handler_)
{
handler.addEndpoint(name, endpoint);
}
InterserverIOEndpointPtr getEndpoint()
{
return endpoint;
}
~InterserverIOEndpointHolder()
{
try
{
handler.removeEndpoint(name);
}
catch (...)
{
tryLogCurrentException("~InterserverIOEndpointHolder");
}
}
private:
String name;
InterserverIOEndpointPtr endpoint;
InterserverIOHandler & handler;
};
typedef Poco::SharedPtr<InterserverIOEndpointHolder> InterserverIOEndpointHolderPtr;
}

View File

@ -18,6 +18,8 @@ public:
ASTPtr columns;
String format;
ASTPtr select;
/// Идентификатор запроса INSERT. Используется при репликации.
String insert_id;
/// Данные для вставки
const char * data;
const char * end;

View File

@ -0,0 +1,106 @@
#pragma once
#include <zkutil/ZooKeeper.h>
#include <DB/Core/Exception.h>
namespace DB
{
/** Примитив синхронизации. Работает следующим образом:
* При создании создает неэфемерную инкрементную ноду и помечает ее как заблокированную (LOCKED).
* unlock() разблокирует ее (UNLOCKED).
* При вызове деструктора или завершении сессии в ZooKeeper, переходит в состояние ABANDONED.
* (В том числе при падении программы)
*/
class AbandonableLockInZooKeeper
{
public:
enum State
{
UNLOCKED,
LOCKED,
ABANDONED,
};
AbandonableLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_)
: zookeeper(zookeeper_), path_prefix(path_prefix_)
{
/// Создадим вспомогательную эфемерную ноду.
holder_path = zookeeper.create(temp_path + "/abandonable_lock-", "", zkutil::CreateMode::EphemeralSequential);
/// Запишем в основную ноду путь к вспомогательной.
path = zookeeper.create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential);
}
String getPath()
{
return path;
}
/// Распарсить число в конце пути.
UInt64 getNumber()
{
return static_cast<UInt64>(atol(path.substr(path_prefix.size()).c_str()));
}
void unlock()
{
zookeeper.remove(path);
zookeeper.remove(holder_path);
}
/// Добавляет в список действия, эквивалентные unlock().
void getUnlockOps(zkutil::Ops & ops)
{
ops.push_back(new zkutil::Op::Remove(path, -1));
ops.push_back(new zkutil::Op::Remove(holder_path, -1));
}
~AbandonableLockInZooKeeper()
{
try
{
zookeeper.tryRemove(holder_path);
zookeeper.trySet(path, ""); /// Это не обязательно.
}
catch (...)
{
tryLogCurrentException("~AbandonableLockInZooKeeper");
}
}
static State check(const String & path, zkutil::ZooKeeper & zookeeper)
{
String holder_path;
/// Если нет основной ноды, UNLOCKED.
if (!zookeeper.tryGet(path, holder_path))
return UNLOCKED;
/// Если в основной ноде нет пути к вспомогательной, ABANDONED.
if (holder_path.empty())
return ABANDONED;
/// Если вспомогательная нода жива, LOCKED.
if (zookeeper.exists(holder_path))
return LOCKED;
/// Если вспомогательной ноды нет, нужно еще раз проверить существование основной ноды,
/// потому что за это время могли успеть вызвать unlock().
/// Заодно уберем оттуда путь к вспомогательной ноде.
if (zookeeper.trySet(path, "") == zkutil::ReturnCode::Ok)
return ABANDONED;
return UNLOCKED;
}
private:
zkutil::ZooKeeper & zookeeper;
String path_prefix;
String path;
String holder_path;
};
}

View File

@ -1,14 +1,19 @@
#pragma once
#include <statdaemons/Increment.h>
#include <statdaemons/threadpool.hpp>
#include <DB/Core/SortDescription.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ExpressionActions.h>
#include <DB/Storages/IStorage.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/Common/escapeForFileName.h>
#include <Poco/RWLock.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
namespace DB
{
@ -76,6 +81,9 @@ struct MergeTreeSettings
/// Сколько потоков использовать для объединения кусков.
size_t merging_threads = 2;
/// Сколько потоков использовать для загрузки кусков с других реплик и объединения кусков (для ReplicatedMergeTree).
size_t replication_threads = 4;
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read = 20 * 8192;
@ -85,9 +93,8 @@ struct MergeTreeSettings
/// Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их.
size_t coarse_index_granularity = 8;
/** Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется.
* (Чтобы большие запросы не вымывали кэш.)
*/
/// Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется.
/// (Чтобы большие запросы не вымывали кэш.)
size_t max_rows_to_use_cache = 1024 * 1024;
/// Через сколько секунд удалять old_куски.
@ -125,9 +132,28 @@ public:
{
return files.empty();
}
String toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
static Checksums parse(const String & s)
{
ReadBufferFromString in(s);
Checksums res;
res.readText(in);
assertEOF(in);
return res;
}
};
DataPart(MergeTreeData & storage_) : storage(storage_), size_in_bytes(0) {}
DataPart(MergeTreeData & storage_) : storage(storage_), size(0), size_in_bytes(0) {}
MergeTreeData & storage;
DayNum_t left_date;
@ -176,10 +202,11 @@ public:
Poco::File(to).remove(true);
}
void renameToOld() const
/// Переименовывает кусок, дописав к имени префикс.
void renameAddPrefix(const String & prefix) const
{
String from = storage.full_path + name + "/";
String to = storage.full_path + "old_" + name + "/";
String to = storage.full_path + prefix + name + "/";
Poco::File f(from);
f.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
@ -216,14 +243,20 @@ public:
&& right >= rhs.right;
}
/// Загрузить индекс и вычислить размер.
/// Загрузить индекс и вычислить размер. Если size=0, вычислить его тоже.
void loadIndex()
{
/// Размер - в количестве засечек.
if (!size)
size = Poco::File(storage.full_path + name + "/" + escapeForFileName(storage.columns->front().first) + ".mrk")
.getSize() / MERGE_TREE_MARK_SIZE;
size_t key_size = storage.sort_descr.size();
index.resize(key_size * size);
String index_path = storage.full_path + name + "/primary.idx";
ReadBufferFromFile index_file(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
ReadBufferFromFile index_file(index_path,
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
for (size_t i = 0; i < size; ++i)
for (size_t j = 0; j < key_size; ++j)
@ -297,7 +330,7 @@ public:
bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) const;
/// Кладет в DataPart данные из имени кусочка.
void parsePartName(const String & file_name, const Poco::RegularExpression::MatchVec & matches, DataPart & part);
void parsePartName(const String & file_name, DataPart & part, const Poco::RegularExpression::MatchVec * matches = nullptr);
std::string getTableName() { return ""; }
@ -309,14 +342,30 @@ public:
*/
DataParts getDataParts();
/** Удаляет куски old_parts и добавляет кусок new_part. Если какого-нибудь из удаляемых кусков нет, бросает исключение.
/** Возвращает кусок с указанным именем или кусок, покрывающий его. Если такого нет, возвращает nullptr.
*/
void replaceParts(DataPartsVector old_parts, DataPartPtr new_part);
DataPartPtr getContainingPart(const String & part_name);
/** Переименовывает временный кусок в постоянный и добавляет его в рабочий набор.
* Если increment!=nullptr, индекс куска берется из инкремента. Иначе индекс куска не меняется.
* Предполагается, что кусок не пересекается с существующими.
*/
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment);
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment = nullptr);
/** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски.
* Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке).
*/
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr);
/** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора.
* Лучше использовать только когда никто не может читать или писать этот кусок
* (например, при инициализации таблицы).
*/
void renameAndDetachPart(DataPartPtr part, const String & prefix);
/** Удаляет кусок из рабочего набора. clearOldParts удалит его файлы, если на него никто не ссылается.
*/
void removePart(DataPartPtr part);
/** Удалить неактуальные куски.
*/
@ -352,13 +401,13 @@ public:
const MergeTreeSettings settings;
const ASTPtr primary_expr_ast;
private:
ExpressionActionsPtr primary_expr;
SortDescription sort_descr;
Block primary_key_sample;
ASTPtr primary_expr_ast;
String full_path;
NamesAndTypesListPtr columns;

View File

@ -12,10 +12,11 @@ class MergeTreeDataMerger
public:
MergeTreeDataMerger(MergeTreeData & data_) : data(data_), log(&Logger::get("MergeTreeDataMerger")), canceled(false) {}
typedef boost::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> AllowedMergingPredicate;
typedef std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> AllowedMergingPredicate;
/** Выбирает, какие куски слить. Использует кучу эвристик.
* Если merge_anything_for_old_months, для кусков за прошедшие месяцы снимается ограничение на соотношение размеров.
* Если available_disk_space > 0, выбирает куски так, чтобы места на диске хватило с запасом для их слияния.
*
* can_merge - функция, определяющая, можно ли объединить пару соседних кусков.
* Эта функция должна координировать слияния со вставками и другими слияниями, обеспечивая, что:
@ -24,19 +25,20 @@ public:
*/
bool selectPartsToMerge(
MergeTreeData::DataPartsVector & what,
String & merged_name,
size_t available_disk_space,
bool merge_anything_for_old_months,
bool aggressive,
bool only_small,
const AllowedMergingPredicate & can_merge);
/// Сливает куски. Возвращает название нового куска. Если слияние отменили, возвращает пустую строку.
String mergeParts(const MergeTreeData::DataPartsVector & parts);
/// Сливает куски.
MergeTreeData::DataPartPtr mergeParts(const MergeTreeData::DataPartsVector & parts, const String & merged_name);
/// Примерное количество места на диске, нужное для мерджа. С запасом.
size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);
/** Отменяет все текущие мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро отменят слияние и вернут пустую строку.
/** Отменяет все текущие мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* После этого с этим экземпляром ничего делать нельзя.
*/
void cancelAll() { canceled = true; }

View File

@ -12,9 +12,6 @@
#include <DB/Columns/ColumnNested.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
namespace DB
{

View File

@ -0,0 +1,106 @@
#pragma once
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/AbandonableLockInZooKeeper.h>
namespace DB
{
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_)
: storage(storage_), insert_id(insert_id_), block_index(0), log(&Logger::get("ReplicatedMergeTreeBlockOutputStream")) {}
void write(const Block & block) override
{
auto part_blocks = storage.writer.splitBlockIntoParts(block);
for (auto & current_block : part_blocks)
{
++block_index;
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
AbandonableLockInZooKeeper block_number_lock(
storage.zookeeper_path + "/block_numbers/block-",
storage.zookeeper_path + "/temp", storage.zookeeper);
UInt64 part_number = block_number_lock.getNumber();
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, part_number);
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper.tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
LOG_INFO(log, "Block with this ID already exists; ignoring it");
/// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять.
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
expected_checksums.check(part->checksums);
part->remove();
/// Бросаем block_number_lock.
continue;
}
storage.data.renameTempPartAndAdd(part);
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
String checksums_str = part->checksums.toString();
/// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock.
zkutil::Ops ops;
if (!block_id.empty())
{
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id,
"",
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
checksums_str,
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number),
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
}
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/parts/" + part->name,
"",
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/parts/" + part->name + "/checksums",
checksums_str,
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/log/log-",
log_entry.toString(),
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::PersistentSequential));
block_number_lock.getUnlockOps(ops);
storage.zookeeper.multi(ops);
}
}
private:
StorageReplicatedMergeTree & storage;
String insert_id;
size_t block_index;
Logger * log;
};
}

View File

@ -0,0 +1,128 @@
#pragma once
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/HashingWriteBuffer.h>
#include <DB/IO/copyData.h>
namespace DB
{
class ReplicatedMergeTreePartsServer : public InterserverIOEndpoint
{
public:
ReplicatedMergeTreePartsServer(MergeTreeData & data_, StoragePtr owned_storage_) : data(data_),
owned_storage(owned_storage_), log(&Logger::get("ReplicatedMergeTreePartsServer")) {}
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override
{
String part_name = params.get("part");
LOG_TRACE(log, "Sending part " << part_name);
auto storage_lock = owned_storage->lockStructure(false);
MergeTreeData::DataPartPtr part = findPart(part_name);
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
checksums.files["checksums.txt"];
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String path = data.getFullPath() + part_name + "/" + it.first;
UInt64 size = Poco::File(path).getSize();
writeStringBinary(it.first, out);
writeBinary(size, out);
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out);
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writeBinary(hashing_out.getHash(), out);
}
}
private:
MergeTreeData & data;
StoragePtr owned_storage;
Logger * log;
MergeTreeData::DataPartPtr findPart(const String & name)
{
MergeTreeData::DataPartPtr part = data.getContainingPart(name);
if (part && part->name == name)
return part;
throw Exception("No part " + name + " in table");
}
};
class ReplicatedMergeTreePartsFetcher
{
public:
ReplicatedMergeTreePartsFetcher(MergeTreeData & data_) : data(data_), log(&Logger::get("ReplicatedMergeTreePartsFetcher")) {}
/// Скачивает кусок в tmp_директорию.
MergeTreeData::MutableDataPartPtr fetchPart(
const String & part_name,
const String & replica_path,
const String & host,
int port)
{
ReadBufferFromHTTP::Params params = {
std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path),
std::make_pair("part", part_name),
std::make_pair("compress", "false")};
ReadBufferFromHTTP in(host, port, params);
String part_path = data.getFullPath() + "tmp_" + part_name + "/";
if (!Poco::File(part_path).createDirectory())
throw Exception("Directory " + part_path + " already exists");
size_t files;
readBinary(files, in);
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
WriteBufferFromFile file_out(part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(in, hashing_out, file_size);
uint128 expected_hash;
readBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
}
assertEOF(in);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
data.parsePartName(part_name, *new_data_part);
new_data_part->name = "tmp_" + part_name;
new_data_part->modification_time = time(0);
new_data_part->loadIndex();
new_data_part->loadChecksums();
return new_data_part;
}
private:
MergeTreeData & data;
Logger * log;
};
}

View File

@ -1,10 +1,11 @@
#pragma once
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include "MergeTree/MergeTreeDataSelectExecutor.h"
#include "MergeTree/MergeTreeDataWriter.h"
#include "MergeTree/MergeTreeDataMerger.h"
#include "MergeTree/DiskSpaceMonitor.h"
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <statdaemons/threadpool.hpp>
namespace DB
{

View File

@ -0,0 +1,343 @@
#pragma once
#include <DB/Storages/IStorage.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <zkutil/ZooKeeper.h>
#include <zkutil/LeaderElection.h>
#include <statdaemons/threadpool.hpp>
namespace DB
{
/** Движок, использующий merge-дерево и реплицируемый через ZooKeeper.
*/
class StorageReplicatedMergeTree : public IStorage
{
public:
/** Если !attach, либо создает новую таблицу в ZK, либо добавляет реплику в существующую таблицу.
*/
static StoragePtr create(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// NULL, если семплирование не поддерживается.
size_t index_granularity_,
MergeTreeData::Mode mode_ = MergeTreeData::Ordinary,
const String & sign_column_ = "",
const MergeTreeSettings & settings_ = MergeTreeSettings());
void shutdown();
~StorageReplicatedMergeTree();
std::string getName() const override
{
return "Replicated" + data.getModePrefix() + "MergeTree";
}
std::string getTableName() const override { return table_name; }
std::string getSignColumnName() const { return data.getSignColumnName(); }
bool supportsSampling() const override { return data.supportsSampling(); }
bool supportsFinal() const override { return data.supportsFinal(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
const NamesAndTypesList & getColumnsList() const override { return data.getColumnsList(); }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(ASTPtr query) override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
*/
void drop() override;
private:
friend class ReplicatedMergeTreeBlockOutputStream;
/// Добавляет куски в множество currently_merging.
struct CurrentlyMergingPartsTagger
{
Strings parts;
StorageReplicatedMergeTree & storage;
CurrentlyMergingPartsTagger(const Strings & parts_, StorageReplicatedMergeTree & storage_)
: parts(parts_), storage(storage_)
{
Poco::ScopedLock<Poco::FastMutex> lock(storage.currently_merging_mutex);
for (const auto & name : parts)
{
if (storage.currently_merging.count(name))
throw Exception("Tagging alreagy tagged part " + name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage.currently_merging.insert(parts.begin(), parts.end());
}
~CurrentlyMergingPartsTagger()
{
try
{
Poco::ScopedLock<Poco::FastMutex> lock(storage.currently_merging_mutex);
for (const auto & name : parts)
{
if (!storage.currently_merging.count(name))
throw Exception("Untagging already untagged part " + name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
storage.currently_merging.erase(name);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
};
typedef Poco::SharedPtr<CurrentlyMergingPartsTagger> CurrentlyMergingPartsTaggerPtr;
/// Добавляет кусок в множество future_parts.
struct FuturePartTagger
{
String part;
StorageReplicatedMergeTree & storage;
FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_)
: part(part_), storage(storage_)
{
if (!storage.future_parts.insert(part).second)
throw Exception("Tagging already tagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
~FuturePartTagger()
{
try
{
Poco::ScopedLock<Poco::FastMutex> lock(storage.queue_mutex);
if (!storage.future_parts.erase(part))
throw Exception("Untagging already untagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
};
typedef Poco::SharedPtr<FuturePartTagger> FuturePartTaggerPtr;
struct LogEntry
{
enum Type
{
GET_PART,
MERGE_PARTS,
};
String znode_name;
Type type;
String source_replica;
String new_part_name;
Strings parts_to_merge;
CurrentlyMergingPartsTaggerPtr currently_merging_tagger;
FuturePartTaggerPtr future_part_tagger;
void tagPartsAsCurrentlyMerging(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS)
currently_merging_tagger = new CurrentlyMergingPartsTagger(parts_to_merge, storage);
}
void tagPartAsFuture(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART)
future_part_tagger = new FuturePartTagger(new_part_name, storage);
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
String toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
static LogEntry parse(const String & s)
{
ReadBufferFromString in(s);
LogEntry res;
res.readText(in);
assertEOF(in);
return res;
}
};
typedef std::list<LogEntry> LogEntries;
typedef std::set<String> StringSet;
typedef std::vector<std::thread> Threads;
Context & context;
zkutil::ZooKeeper & zookeeper;
/// Куски, для которых в очереди есть задание на слияние.
StringSet currently_merging;
Poco::FastMutex currently_merging_mutex;
/** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/).
* В ZK записи в хронологическом порядке. Здесь - не обязательно.
*/
LogEntries queue;
Poco::FastMutex queue_mutex;
/** Куски, которые появятся в результате действий, выполняемых прямо сейчас фоновыми потоками (этих действий нет в очереди).
* Использовать под залоченным queue_mutex.
*/
StringSet future_parts;
String table_name;
String full_path;
String zookeeper_path;
String replica_name;
String replica_path;
/** /replicas/me/is_active.
*/
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
/** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния.
*/
bool is_leader_node;
InterserverIOEndpointHolderPtr endpoint_holder;
MergeTreeData data;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMerger merger;
ReplicatedMergeTreePartsFetcher fetcher;
zkutil::LeaderElectionPtr leader_election;
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
std::thread queue_updating_thread;
/// Потоки, выполняющие действия из очереди.
Threads queue_threads;
/// Поток, выбирающий куски для слияния.
std::thread merge_selecting_thread;
Logger * log;
volatile bool shutdown_called;
StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_ = MergeTreeData::Ordinary,
const String & sign_column_ = "",
const MergeTreeSettings & settings_ = MergeTreeSettings());
/// Инициализация.
/** Проверяет, что в ZooKeeper в таблице нет данных.
*/
bool isTableEmpty();
/** Создает минимальный набор нод в ZooKeeper.
*/
void createTable();
void createReplica();
/** Отметить в ZooKeeper, что эта реплика сейчас активна.
*/
void activateReplica();
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/
void checkTableStructure();
/** Проверить, что множество кусков соответствует тому, что в ZK (/replicas/me/parts/).
* Если каких-то кусков, описанных в ZK нет локально, бросить исключение.
* Если какие-то локальные куски не упоминаются в ZK, удалить их.
* Но если таких слишком много, на всякий случай бросить исключение - скорее всего, это ошибка конфигурации.
*/
void checkParts();
/// Выполнение заданий из очереди.
/** Кладет в queue записи из ZooKeeper (/replicas/me/queue/).
*/
void loadQueue();
/** Копирует новые записи из логов всех реплик в очередь этой реплики.
*/
void pullLogsToQueue();
/** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое.
* Вызывается под queue_mutex.
*/
bool shouldExecuteLogEntry(const LogEntry & entry);
/** Выполнить действие из очереди. Бросает исключение, если что-то не так.
*/
void executeLogEntry(const LogEntry & entry);
/** В бесконечном цикле обновляет очередь.
*/
void queueUpdatingThread();
/** В бесконечном цикле выполняет действия из очереди.
*/
void queueThread();
/// Выбор кусков для слияния.
void becomeLeader();
/** В бесконечном цикле выбирает куски для слияния и записывает в лог.
*/
void mergeSelectingThread();
/// Вызывается во время выбора кусков для слияния.
bool canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right);
/// Обмен кусками.
/** Бросает исключение, если куска ни у кого нет.
*/
String findActiveReplicaHavingPart(const String & part_name);
/** Скачать указанный кусок с указанной реплики.
*/
void fetchPart(const String & part_name, const String & replica_name);
};
}

View File

@ -41,6 +41,7 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
/// Если все строки во входных потоках схлопнулись, мы все равно хотим выдать хоть один блок в результат.
if (last_in_stream && merged_rows == 0 && !blocks_written)
{
LOG_INFO(log, "All rows collapsed");
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert(last_positive[i]);

View File

@ -14,5 +14,19 @@ void copyData(ReadBuffer & from, WriteBuffer & to)
from.position() = from.buffer().end();
}
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
{
while (bytes > 0 && !from.eof())
{
size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
to.write(from.position(), count);
from.position() += count;
bytes -= count;
}
if (bytes > 0)
throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
}

View File

@ -547,6 +547,41 @@ void Context::resetCaches() const
shared->mark_cache->reset();
}
void Context::setZooKeeper(SharedPtr<zkutil::ZooKeeper> zookeeper)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->zookeeper)
throw Exception("ZooKeeper client has already been set.", ErrorCodes::LOGICAL_ERROR);
shared->zookeeper = zookeeper;
}
zkutil::ZooKeeper & Context::getZooKeeper() const
{
if (!shared->zookeeper)
throw Exception("No ZooKeeper in Context", ErrorCodes::NO_ZOOKEEPER);
return *shared->zookeeper;
}
void Context::setInterserverIOHost(const String & host, int port)
{
shared->interserver_io_host = host;
shared->interserver_io_port = port;
}
String Context::getInterserverIOHost() const
{
return shared->interserver_io_host;
}
int Context::getInterserverIOPort() const
{
return shared->interserver_io_port;
}
void Context::initClusters()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);

View File

@ -133,7 +133,8 @@ void loadMetadata(Context & context)
}
catch (const Exception & e)
{
throw Exception("Cannot create table from metadata file " + tables[j] + ", error: " + e.displayText(),
throw Exception("Cannot create table from metadata file " + tables[j] + ", error: " + e.displayText() +
", stack trace:\n" + e.getStackTrace().toString(),
ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
}
}

View File

@ -1,4 +1,5 @@
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTLiteral.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTInsertQuery.h>
@ -21,6 +22,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, const char
ParserString s_insert("INSERT", true, true);
ParserString s_into("INTO", true, true);
ParserString s_dot(".");
ParserString s_id("ID");
ParserString s_eq("=");
ParserStringLiteral id_p;
ParserString s_values("VALUES", true, true);
ParserString s_format("FORMAT", true, true);
ParserString s_select("SELECT", true, true);
@ -34,6 +38,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, const char
ASTPtr columns;
ASTPtr format;
ASTPtr select;
ASTPtr id;
/// Данные для вставки
const char * data = NULL;
@ -63,6 +68,17 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, const char
ws.ignore(pos, end);
if (s_id.ignore(pos, end, expected))
{
if (!s_eq.ignore(pos, end, expected))
return false;
if (!id_p.parse(pos, end, id, expected))
return false;
}
ws.ignore(pos, end);
/// Есть ли список столбцов
if (s_lparen.ignore(pos, end, expected))
{
@ -121,6 +137,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, const char
query->table = dynamic_cast<ASTIdentifier &>(*table).name;
if (id)
query->insert_id = safeGet<const String &>(dynamic_cast<ASTLiteral &>(*id).value);
if (format)
query->format = dynamic_cast<ASTIdentifier &>(*format).name;

View File

@ -409,6 +409,10 @@ void formatAST(const ASTInsertQuery & ast, std::ostream & s, size_t indent, bo
s << (hilite ? hilite_keyword : "") << "INSERT INTO " << (hilite ? hilite_none : "")
<< (!ast.database.empty() ? backQuoteIfNeed(ast.database) + "." : "") << backQuoteIfNeed(ast.table);
if (!ast.insert_id.empty())
s << (hilite ? hilite_keyword : "") << " ID = " << (hilite ? hilite_none : "")
<< mysqlxx::quote << ast.insert_id;
if (ast.columns)
{
s << " (";

View File

@ -0,0 +1,98 @@
#include "InterserverIOHTTPHandler.h"
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include <DB/IO/CompressedWriteBuffer.h>
namespace DB
{
void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
HTMLForm params(request);
std::ostringstream request_ostream;
request_ostream << request.stream().rdbuf();
std::string request_string = request_ostream.str();
LOG_TRACE(log, "Request URI: " << request.getURI());
LOG_TRACE(log, "Request body: " << request_string);
std::istringstream request_istream(request_string);
/// NOTE: Тут можно сделать аутентификацию, если понадобится.
String endpoint_name = params.get("endpoint");
bool compress = params.get("compress") == "true";
WriteBufferFromHTTPServerResponse out(response);
auto endpoint = server.global_context->getInterserverIOHandler().getEndpoint(endpoint_name);
if (compress)
{
CompressedWriteBuffer compressed_out(out);
endpoint->processQuery(params, compressed_out);
}
else
{
endpoint->processQuery(params, out);
}
out.finalize();
}
void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
/// Для того, чтобы работал keep-alive.
if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
response.setChunkedTransferEncoding(true);
try
{
processQuery(request, response);
LOG_INFO(log, "Done processing query");
}
catch (Exception & e)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
std::stringstream s;
s << "Code: " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
if (!response.sent())
response.send() << s.str() << std::endl;
LOG_ERROR(log, s.str());
}
catch (Poco::Exception & e)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
std::stringstream s;
s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
if (!response.sent())
response.send() << s.str() << std::endl;
LOG_ERROR(log, s.str());
}
catch (std::exception & e)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
std::stringstream s;
s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what();
if (!response.sent())
response.send() << s.str() << std::endl;
LOG_ERROR(log, s.str());
}
catch (...)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
std::stringstream s;
s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception.";
if (!response.sent())
response.send() << s.str() << std::endl;
LOG_ERROR(log, s.str());
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include "Server.h"
namespace DB
{
class InterserverIOHTTPHandler : public Poco::Net::HTTPRequestHandler
{
public:
InterserverIOHTTPHandler(Server & server_)
: server(server_)
, log(&Logger::get("InterserverIOHTTPHandler "))
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
private:
Server & server;
Logger * log;
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
};
}

View File

@ -5,26 +5,25 @@
namespace DB
{
/// Обработчик http-запросов в формате OLAP-server.
class OLAPHTTPHandler : public Poco::Net::HTTPRequestHandler
/// Обработчик http-запросов в формате OLAP-server.
class OLAPHTTPHandler : public Poco::Net::HTTPRequestHandler
{
public:
OLAPHTTPHandler(Server & server_)
: server(server_)
, log(&Logger::get("OLAPHTTPHandler"))
{
public:
OLAPHTTPHandler(Server & server_)
: server(server_)
, log(&Logger::get("OLAPHTTPHandler"))
{
LOG_TRACE(log, "In constructor.");
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
private:
Server & server;
Logger * log;
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
};
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
private:
Server & server;
Logger * log;
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
};
}

View File

@ -14,6 +14,7 @@
#include "Server.h"
#include "HTTPHandler.h"
#include "InterserverIOHTTPHandler.h"
#include "OLAPHTTPHandler.h"
#include "TCPHandler.h"
@ -212,6 +213,23 @@ int Server::main(const std::vector<std::string> & args)
global_context->setGlobalContext(*global_context);
global_context->setPath(config.getString("path"));
if (config.has("zookeeper"))
global_context->setZooKeeper(new zkutil::ZooKeeper(config, "zookeeper"));
if (config.has("interserver_http_port"))
{
String this_host;
if (config.has("interserver_http_host"))
this_host = config.getString("interserver_http_host");
else
this_host = Poco::Net::DNS::hostName();
String port_str = config.getString("interserver_http_port");
int port = parse<int>(port_str);
global_context->setInterserverIOHost(this_host, port);
}
std::string users_config_path = config.getString("users_config", config.getString("config-file", "config.xml"));
users_config_reloader = new UsersConfigReloader(users_config_path, global_context);
@ -253,7 +271,7 @@ int Server::main(const std::vector<std::string> & args)
Poco::Timespan keep_alive_timeout(config.getInt("keep_alive_timeout", 10), 0);
Poco::ThreadPool server_pool(3, config.getInt("max_connections", 1024));
Poco::Net::HTTPServerParams * http_params = new Poco::Net::HTTPServerParams;
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(settings.receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
@ -277,6 +295,23 @@ int Server::main(const std::vector<std::string> & args)
tcp_socket,
new Poco::Net::TCPServerParams);
/// Interserver IO HTTP
Poco::SharedPtr<Poco::Net::HTTPServer> interserver_io_http_server;
if (config.has("interserver_http_port"))
{
String port_str = config.getString("interserver_http_port");
Poco::Net::ServerSocket interserver_io_http_socket(Poco::Net::SocketAddress("[::]:"
+ port_str));
interserver_io_http_socket.setReceiveTimeout(settings.receive_timeout);
interserver_io_http_socket.setSendTimeout(settings.send_timeout);
interserver_io_http_server = new Poco::Net::HTTPServer(
new HTTPRequestHandlerFactory<InterserverIOHTTPHandler>(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
interserver_io_http_socket,
http_params);
}
/// OLAP HTTP
Poco::SharedPtr<Poco::Net::HTTPServer> olap_http_server;
if (use_olap_server)
@ -284,10 +319,6 @@ int Server::main(const std::vector<std::string> & args)
olap_parser = new OLAP::QueryParser();
olap_converter = new OLAP::QueryConverter(config);
Poco::Net::HTTPServerParams * olap_http_params = new Poco::Net::HTTPServerParams;
olap_http_params->setTimeout(settings.receive_timeout);
olap_http_params->setKeepAliveTimeout(keep_alive_timeout);
Poco::Net::ServerSocket olap_http_socket(Poco::Net::SocketAddress("[::]:" + config.getString("olap_http_port")));
olap_http_socket.setReceiveTimeout(settings.receive_timeout);
olap_http_socket.setSendTimeout(settings.send_timeout);
@ -295,12 +326,14 @@ int Server::main(const std::vector<std::string> & args)
new HTTPRequestHandlerFactory<OLAPHTTPHandler>(*this, "OLAPHTTPHandler-factory"),
server_pool,
olap_http_socket,
olap_http_params);
http_params);
}
http_server.start();
tcp_server.start();
if (use_olap_server)
if (interserver_io_http_server)
interserver_io_http_server->start();
if (olap_http_server)
olap_http_server->start();
LOG_INFO(log, "Ready for connections.");

View File

@ -2,7 +2,6 @@
#include <Yandex/time2str.h>
#include <Poco/Ext/ScopedTry.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Storages/MergeTree/MergeTreeReader.h>
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
@ -52,11 +51,9 @@ MergeTreeData::MergeTreeData(
/// инициализируем описание сортировки
sort_descr.reserve(primary_expr_ast->children.size());
for (ASTs::iterator it = primary_expr_ast->children.begin();
it != primary_expr_ast->children.end();
++it)
for (const ASTPtr & ast : primary_expr_ast->children)
{
String name = (*it)->getColumnName();
String name = ast->getColumnName();
sort_descr.push_back(SortColumnDescription(name, 1));
}
@ -122,8 +119,18 @@ String MergeTreeData::getPartName(DayNum_t left_date, DayNum_t right_date, UInt6
}
void MergeTreeData::parsePartName(const String & file_name, const Poco::RegularExpression::MatchVec & matches, DataPart & part)
void MergeTreeData::parsePartName(const String & file_name, DataPart & part, const Poco::RegularExpression::MatchVec * matches_p)
{
Poco::RegularExpression::MatchVec match_vec;
if (!matches_p)
{
if (!isPartDirectory(file_name, match_vec))
throw Exception("Unexpected part name: " + file_name, ErrorCodes::BAD_DATA_PART_NAME);
matches_p = &match_vec;
}
const Poco::RegularExpression::MatchVec & matches = *matches_p;
DateLUTSingleton & date_lut = DateLUTSingleton::instance();
part.left_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length)));
@ -183,7 +190,7 @@ void MergeTreeData::loadDataParts()
continue;
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
parsePartName(file_name, matches, *part);
parsePartName(file_name, *part, &matches);
part->name = file_name;
/// Для битых кусков, которые могут образовываться после грубого перезапуска сервера, попытаться восстановить куски, из которых они сделаны.
@ -204,10 +211,6 @@ void MergeTreeData::loadDataParts()
continue;
}
/// Размер - в количестве засечек.
part->size = Poco::File(full_path + file_name + "/" + escapeForFileName(columns->front().first) + ".mrk").getSize()
/ MERGE_TREE_MARK_SIZE;
part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();
try
@ -292,7 +295,7 @@ void MergeTreeData::clearOldParts()
{
LOG_DEBUG(log, "'Removing' part " << (*it)->name << " (prepending old_ to its name)");
(*it)->renameToOld();
(*it)->renameAddPrefix("old_");
all_data_parts.erase(it++);
}
else
@ -641,9 +644,8 @@ Strings MergeTreeData::tryRestorePart(const String & path, const String & file_n
Poco::RegularExpression::MatchVec matches;
Strings restored_parts;
isPartDirectory(file_name, matches);
DataPart broken_part(*this);
parsePartName(file_name, matches, broken_part);
parsePartName(file_name, broken_part);
for (int i = static_cast<int>(old_parts.size()) - 1; i >= 0; --i)
{
@ -655,7 +657,7 @@ Strings MergeTreeData::tryRestorePart(const String & path, const String & file_n
old_parts.erase(old_parts.begin() + i);
continue;
}
parsePartName(name, matches, old_part);
parsePartName(name, old_part, &matches);
if (broken_part.contains(old_part))
{
/// Восстанавливаем все содержащиеся куски. Если некоторые из них содержатся в других, их удалит loadDataParts.
@ -680,23 +682,17 @@ Strings MergeTreeData::tryRestorePart(const String & path, const String & file_n
return restored_parts;
}
void MergeTreeData::replaceParts(DataPartsVector old_parts, DataPartPtr new_part)
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> all_lock(all_data_parts_mutex);
for (size_t i = 0; i < old_parts.size(); ++i)
if (data_parts.end() == data_parts.find(old_parts[i]))
throw Exception("Logical error: cannot find data part " + old_parts[i]->name + " in list", ErrorCodes::LOGICAL_ERROR);
data_parts.insert(new_part);
all_data_parts.insert(new_part);
for (size_t i = 0; i < old_parts.size(); ++i)
data_parts.erase(data_parts.find(old_parts[i]));
auto removed = renameTempPartAndReplace(part, increment);
if (!removed.empty())
{
LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
<< " existing part(s) (including " << removed[0]->name << ")");
}
}
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment)
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment)
{
LOG_TRACE(log, "Renaming.");
@ -705,25 +701,58 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * in
String old_path = getFullPath() + part->name + "/";
UInt64 part_id = part->left;
/** Важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
/** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
* Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых
* содержат ещё не добавленный кусок.
*/
if (increment)
part_id = increment->get(false);
part->left = part->right = increment->get(false);
part->left = part->right = part_id;
part->name = getPartName(part->left_date, part->right_date, part_id, part_id, 0);
part->name = getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
if (data_parts.count(part))
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
String new_path = getFullPath() + part->name + "/";
/// Переименовываем кусок.
Poco::File(old_path).renameTo(new_path);
DataPartsVector res;
/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
DataParts::iterator it = data_parts.lower_bound(part);
/// Пойдем влево.
while (it != data_parts.begin())
{
--it;
if (!part->contains(**it))
{
++it;
break;
}
res.push_back(*it);
data_parts.erase(it++); /// Да, ++, а не --.
}
std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
/// Пойдем вправо.
while (it != data_parts.end() && part->contains(**it))
{
res.push_back(*it);
data_parts.erase(it++);
}
data_parts.insert(part);
all_data_parts.insert(part);
return res;
}
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
if (!data_parts.erase(part))
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
part->renameAddPrefix(prefix);
}
MergeTreeData::DataParts MergeTreeData::getDataParts()
@ -733,6 +762,34 @@ MergeTreeData::DataParts MergeTreeData::getDataParts()
return data_parts;
}
MergeTreeData::DataPartPtr MergeTreeData::getContainingPart(const String & part_name)
{
MutableDataPartPtr tmp_part(new DataPart(*this));
parsePartName(part_name, *tmp_part);
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
/// Кусок может покрываться только предыдущим или следующим в data_parts.
DataParts::iterator it = data_parts.lower_bound(tmp_part);
if (it != data_parts.end())
{
if ((*it)->name == part_name)
return *it;
if ((*it)->contains(*tmp_part))
return *it;
}
if (it != data_parts.begin())
{
--it;
if ((*it)->contains(*tmp_part))
return *it;
}
return nullptr;
}
void MergeTreeData::DataPart::Checksums::check(const Checksums & rhs) const
{

View File

@ -35,15 +35,16 @@ static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4;
/// 4) Если в одном из потоков идет мердж крупных кусков, то во втором сливать только маленькие кусочки
/// 5) С ростом логарифма суммарного размера кусочков в мердже увеличиваем требование сбалансированности
bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & parts, size_t available_disk_space,
bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & parts, String & merged_name, size_t available_disk_space,
bool merge_anything_for_old_months, bool aggressive, bool only_small, const AllowedMergingPredicate & can_merge)
{
LOG_DEBUG(log, "Selecting parts to merge");
MergeTreeData::DataParts data_parts = data.getDataParts();
DateLUTSingleton & date_lut = DateLUTSingleton::instance();
if (available_disk_space == 0)
available_disk_space = std::numeric_limits<size_t>::max();
size_t min_max = -1U;
size_t min_min = -1U;
int max_len = 0;
@ -209,26 +210,34 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
{
parts.clear();
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
UInt32 level = 0;
MergeTreeData::DataParts::iterator it = best_begin;
for (int i = 0; i < max_len; ++i)
{
parts.push_back(*it);
level = std::max(level, parts[i]->level);
left_date = std::min(left_date, parts[i]->left_date);
right_date = std::max(right_date, parts[i]->right_date);
++it;
}
merged_name = MergeTreeData::getPartName(
left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
}
else
{
LOG_DEBUG(log, "No parts to merge");
}
return found;
}
/// parts должны быть отсортированы.
String MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & parts)
MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & parts, const String & merged_name)
{
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
@ -237,25 +246,9 @@ String MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & pa
for (const auto & it : columns_list)
all_column_names.push_back(it.first);
DateLUTSingleton & date_lut = DateLUTSingleton::instance();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->left_date = std::numeric_limits<UInt16>::max();
new_data_part->right_date = std::numeric_limits<UInt16>::min();
new_data_part->left = parts.front()->left;
new_data_part->right = parts.back()->right;
new_data_part->level = 0;
for (size_t i = 0; i < parts.size(); ++i)
{
new_data_part->level = std::max(new_data_part->level, parts[i]->level);
new_data_part->left_date = std::min(new_data_part->left_date, parts[i]->left_date);
new_data_part->right_date = std::max(new_data_part->right_date, parts[i]->right_date);
}
++new_data_part->level;
new_data_part->name = MergeTreeData::getPartName(
new_data_part->left_date, new_data_part->right_date, new_data_part->left, new_data_part->right, new_data_part->level);
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
data.parsePartName(merged_name, *new_data_part);
new_data_part->name = "tmp_" + merged_name;
/** Читаем из всех кусков, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
@ -292,8 +285,7 @@ String MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & pa
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR);
}
String new_part_tmp_path = data.getFullPath() + "tmp_" + new_data_part->name + "/";
String new_part_res_path = data.getFullPath() + new_data_part->name + "/";
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, data.getColumnsList());
@ -305,18 +297,15 @@ String MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & pa
to->write(block);
if (canceled)
{
LOG_INFO(log, "Canceled merging parts.");
return "";
}
throw Exception("Canceled merging parts", ErrorCodes::ABORTED);
merged_stream->readSuffix();
new_data_part->checksums = to->writeSuffixAndGetChecksums();
new_data_part->index.swap(to->getIndex());
/// В обычном режиме строчки не могут удалиться при мердже.
if (0 == to->marksCount() && data.mode == MergeTreeData::Ordinary)
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.
if (0 == to->marksCount())
throw Exception("Empty part after merge", ErrorCodes::LOGICAL_ERROR);
new_data_part->size = to->marksCount();
@ -324,19 +313,37 @@ String MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & pa
if (0 == to->marksCount())
{
LOG_INFO(log, "All rows have been deleted while merging from " << parts.front()->name << " to " << parts.back()->name);
return "";
throw Exception("All rows have been deleted while merging from " + parts.front()->name
+ " to " + parts.back()->name, ErrorCodes::LOGICAL_ERROR);
}
/// Переименовываем кусок.
Poco::File(new_part_tmp_path).renameTo(new_part_res_path);
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part);
/// Добавляем новый кусок в набор.
data.replaceParts(parts, new_data_part);
if (new_data_part->name != merged_name)
LOG_ERROR(log, "Unexpected part name: " << new_data_part->name << " instead of " << merged_name);
/// Проверим, что удалились все исходные куски и только они.
if (replaced_parts.size() != parts.size())
{
LOG_ERROR(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
<< " instead of " << parts.size());
}
else
{
for (size_t i = 0; i < parts.size(); ++i)
{
if (parts[i]->name != replaced_parts[i]->name)
{
LOG_ERROR(log, "Unexpected part removed when adding " << new_data_part->name << ": " << replaced_parts[i]->name
<< " instead of " << parts[i]->name);
}
}
}
LOG_TRACE(log, "Merged " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
return new_data_part->name;
return new_data_part;
}
size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts)

View File

@ -21,6 +21,7 @@
#include <DB/Storages/StorageChunks.h>
#include <DB/Storages/StorageChunkRef.h>
#include <DB/Storages/StorageChunkMerger.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
@ -29,6 +30,15 @@
namespace DB
{
static bool endsWith(const std::string & s, const std::string & suffix)
{
return s.size() >= suffix.size() && s.substr(s.size() - suffix.size()) == suffix;
}
static bool startsWith(const std::string & s, const std::string & prefix)
{
return s.size() >= prefix.size() && s.substr(0, prefix.size()) == prefix;
}
/** Для StorageMergeTree: достать первичный ключ в виде ASTExpressionList.
* Он может быть указан в кортеже: (CounterID, Date),
@ -176,80 +186,120 @@ StoragePtr StorageFactory::get(
return StorageDistributed::create(table_name, columns, remote_database, remote_table, cluster_name,
context, sign_column_name);
}
else if (name == "MergeTree" || name == "SummingMergeTree")
else if (endsWith(name, "MergeTree"))
{
/** В качестве аргумента для движка должно быть указано:
/** Движки [Replicated][Summing|Collapsing]MergeTree (6 комбинаций)
* В качестве аргумента для движка должно быть указано:
* - (для Replicated) Путь к таблице в ZooKeeper
* - (для Replicated) Имя реплики в ZooKeeper
* - имя столбца с датой;
* - имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
* - выражение для сортировки в скобках;
* - index_granularity.
* Например: ENGINE = MergeTree(EventDate, intHash32(UniqID), (CounterID, EventDate, intHash32(UniqID), EventTime), 8192).
*
* SummingMergeTree - вариант, в котором при слиянии делается суммирование всех числовых столбцов кроме PK
* - для Баннерной Крутилки.
*/
ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception("Storage " + name + " requires 3 or 4 parameters"
" - name of column with date, [name of column for sampling], primary key expression, index granularity.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 3 && args.size() != 4)
throw Exception("Storage " + name + " requires 3 or 4 parameters"
" - name of column with date, [name of column for sampling], primary key expression, index granularity.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
size_t arg_offset = args.size() - 3;
String date_column_name = dynamic_cast<ASTIdentifier &>(*args[0]).name;
ASTPtr sampling_expression = arg_offset == 0 ? NULL : args[1];
UInt64 index_granularity = safeGet<UInt64>(dynamic_cast<ASTLiteral &>(*args[arg_offset + 2]).value);
ASTPtr primary_expr_list = extractPrimaryKey(args[arg_offset + 1], name);
return StorageMergeTree::create(
data_path, table_name, columns, context, primary_expr_list, date_column_name, sampling_expression, index_granularity,
name == "SummingMergeTree" ? MergeTreeData::Summing : MergeTreeData::Ordinary);
}
else if (name == "CollapsingMergeTree")
{
/** В качестве аргумента для движка должно быть указано:
* - имя столбца с датой;
* - имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
* - выражение для сортировки в скобках;
* - (не обязательно) имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
* - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких);
* - index_granularity;
* - имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
* Например: ENGINE = CollapsingMergeTree(EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
* - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
* Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
*/
String name_part = name.substr(0, name.size() - strlen("MergeTree"));
bool replicated = startsWith(name_part, "Replicated");
if (replicated)
name_part = name_part.substr(strlen("Replicated"));
MergeTreeData::Mode mode = MergeTreeData::Ordinary;
if (name_part == "Collapsing")
mode = MergeTreeData::Collapsing;
else if (name_part == "Summing")
mode = MergeTreeData::Summing;
else if (!name_part.empty())
throw Exception("Unknown storage " + name, ErrorCodes::UNKNOWN_STORAGE);
ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception("Storage CollapsingMergeTree requires 4 or 5 parameters"
" - name of column with date, [name of column for sampling], primary key expression, index granularity, sign_column.",
ASTs args;
if (args_func.size() == 1)
args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
size_t additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing ? 1 : 0);
if (args.size() != additional_params + 3 && args.size() != additional_params + 4)
{
String params;
if (replicated)
params += "path in ZooKeeper, replica name, ";
params += "name of column with date, [name of column for sampling], primary key expression, index granularity";
if (mode == MergeTreeData::Collapsing)
params += "sign column";
throw Exception("Storage " + name + " requires " + toString(additional_params + 3) + " or "
+ toString(additional_params + 4) +" parameters: " + params,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
String zookeeper_path;
String replica_name;
if (args.size() != 4 && args.size() != 5)
throw Exception("Storage CollapsingMergeTree requires 4 or 5 parameters"
" - name of column with date, [name of column for sampling], primary key expression, index granularity, sign_column.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String date_column_name;
ASTPtr primary_expr_list;
ASTPtr sampling_expression;
UInt64 index_granularity;
size_t arg_offset = args.size() - 4;
String date_column_name = dynamic_cast<ASTIdentifier &>(*args[0]).name;
ASTPtr sampling_expression = arg_offset == 0 ? NULL : args[1];
UInt64 index_granularity = safeGet<UInt64>(dynamic_cast<ASTLiteral &>(*args[arg_offset + 2]).value);
String sign_column_name = dynamic_cast<ASTIdentifier &>(*args[arg_offset + 3]).name;
String sign_column_name;
ASTPtr primary_expr_list = extractPrimaryKey(args[arg_offset + 1], name);
if (replicated)
{
auto ast = dynamic_cast<ASTLiteral *>(&*args[0]);
if (ast && ast->value.getType() == Field::Types::String)
zookeeper_path = safeGet<String>(ast->value);
else
throw Exception("Path in ZooKeeper must be a string literal", ErrorCodes::BAD_ARGUMENTS);
return StorageMergeTree::create(
data_path, table_name, columns, context, primary_expr_list, date_column_name,
sampling_expression, index_granularity, MergeTreeData::Collapsing, sign_column_name);
ast = dynamic_cast<ASTLiteral *>(&*args[1]);
if (ast && ast->value.getType() == Field::Types::String)
replica_name = safeGet<String>(ast->value);
else
throw Exception("Replica name must be a string literal", ErrorCodes::BAD_ARGUMENTS);
args.erase(args.begin(), args.begin() + 2);
}
if (mode == MergeTreeData::Collapsing)
{
if (auto ast = dynamic_cast<ASTIdentifier *>(&*args.back()))
sign_column_name = ast->name;
else
throw Exception("Sign column name must be an unquoted string", ErrorCodes::BAD_ARGUMENTS);
args.pop_back();
}
if (args.size() == 4)
{
sampling_expression = args[1];
args.erase(args.begin() + 1);
}
if (auto ast = dynamic_cast<ASTIdentifier *>(&*args[0]))
date_column_name = ast->name;
else
throw Exception("Date column name must be an unquoted string", ErrorCodes::BAD_ARGUMENTS);
primary_expr_list = extractPrimaryKey(args[1], name);
auto ast = dynamic_cast<ASTLiteral *>(&*args[2]);
if (ast && ast->value.getType() == Field::Types::UInt64)
index_granularity = safeGet<UInt64>(ast->value);
else
throw Exception("Index granularity must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
if (replicated)
return StorageReplicatedMergeTree::create(zookeeper_path, replica_name, attach, data_path, table_name,
columns, context, primary_expr_list, date_column_name,
sampling_expression, index_granularity, mode, sign_column_name);
else
return StorageMergeTree::create(data_path, table_name,
columns, context, primary_expr_list, date_column_name,
sampling_expression, index_granularity, mode, sign_column_name);
}
else if (name == "SystemNumbers")
{

View File

@ -145,12 +145,13 @@ void StorageMergeTree::mergeThread(bool while_can, bool aggressive)
/// К концу этого логического блока должен быть вызван деструктор, чтобы затем корректно определить удаленные куски
/// Нужно вызывать деструктор под незалоченным currently_merging_mutex.
CurrentlyMergingPartsTaggerPtr merging_tagger;
String merged_name;
{
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
MergeTreeData::DataPartsVector parts;
auto can_merge = boost::bind(&StorageMergeTree::canMergeParts, this, _1, _2);
auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
bool only_small = false;
/// Если есть активный мердж крупных кусков, то ограничиваемся мерджем только маленьких частей.
@ -163,14 +164,19 @@ void StorageMergeTree::mergeThread(bool while_can, bool aggressive)
}
}
if (!merger.selectPartsToMerge(parts, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, disk_space, true, aggressive, only_small, can_merge))
LOG_DEBUG(log, "Selecting parts to merge");
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))
{
LOG_DEBUG(log, "No parts to merge");
break;
}
merging_tagger = new CurrentlyMergingPartsTagger(parts, merger.estimateDiskSpaceForMerge(parts), *this);
}
merger.mergeParts(merging_tagger->parts);
merger.mergeParts(merging_tagger->parts, merged_name);
}
if (shutdown_called)

View File

@ -0,0 +1,978 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
namespace DB
{
const auto QUEUE_UPDATE_SLEEP = std::chrono::seconds(5);
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
const auto MERGE_SELECTING_SLEEP = std::chrono::seconds(5);
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_)
:
context(context_), zookeeper(context.getZooKeeper()),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
replica_name(replica_name_), is_leader_node(false),
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_),
reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get("StorageReplicatedMergeTree: " + table_name)),
shutdown_called(false)
{
if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
zookeeper_path.erase(zookeeper_path.end() - 1);
replica_path = zookeeper_path + "/replicas/" + replica_name;
if (!attach)
{
if (!zookeeper.exists(zookeeper_path))
createTable();
if (!isTableEmpty())
throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);
checkTableStructure();
createReplica();
}
else
{
checkTableStructure();
checkParts();
}
loadQueue();
activateReplica();
leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper,
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
for (size_t i = 0; i < settings_.replication_threads; ++i)
queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
}
StoragePtr StorageReplicatedMergeTree::create(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_)
{
StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
return res_ptr;
}
static String formattedAST(const ASTPtr & ast)
{
if (!ast)
return "";
std::stringstream ss;
formatAST(*ast, ss, 0, false, true);
return ss.str();
}
void StorageReplicatedMergeTree::createTable()
{
zookeeper.create(zookeeper_path, "", zkutil::CreateMode::Persistent);
/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
std::stringstream metadata;
metadata << "metadata format version: 1" << std::endl;
metadata << "date column: " << data.date_column_name << std::endl;
metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
metadata << "index granularity: " << data.index_granularity << std::endl;
metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
metadata << "sign column: " << data.sign_column << std::endl;
metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
metadata << "columns:" << std::endl;
WriteBufferFromOStream buf(metadata);
for (auto & it : data.getColumnsList())
{
writeBackQuotedString(it.first, buf);
writeChar(' ', buf);
writeString(it.second->getName(), buf);
writeChar('\n', buf);
}
buf.next();
zookeeper.create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
}
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/
void StorageReplicatedMergeTree::checkTableStructure()
{
String metadata_str = zookeeper.get(zookeeper_path + "/metadata");
ReadBufferFromString buf(metadata_str);
assertString("metadata format version: 1", buf);
assertString("\ndate column: ", buf);
assertString(data.date_column_name, buf);
assertString("\nsampling expression: ", buf);
assertString(formattedAST(data.sampling_expression), buf);
assertString("\nindex granularity: ", buf);
assertString(toString(data.index_granularity), buf);
assertString("\nmode: ", buf);
assertString(toString(static_cast<int>(data.mode)), buf);
assertString("\nsign column: ", buf);
assertString(data.sign_column, buf);
assertString("\nprimary key: ", buf);
assertString(formattedAST(data.primary_expr_ast), buf);
assertString("\ncolumns:\n", buf);
for (auto & it : data.getColumnsList())
{
String name;
readBackQuotedString(name, buf);
if (name != it.first)
throw Exception("Unexpected column name in ZooKeeper: expected " + it.first + ", found " + name,
ErrorCodes::UNKNOWN_IDENTIFIER);
assertString(" ", buf);
assertString(it.second->getName(), buf);
assertString("\n", buf);
}
assertEOF(buf);
}
void StorageReplicatedMergeTree::createReplica()
{
zookeeper.create(replica_path, "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
}
void StorageReplicatedMergeTree::activateReplica()
{
std::stringstream host;
host << "host: " << context.getInterserverIOHost() << std::endl;
host << "port: " << context.getInterserverIOPort() << std::endl;
/// Одновременно объявим, что эта реплика активна, и обновим хост.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper.getDefaultACL(), zkutil::CreateMode::Ephemeral));
ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
try
{
zookeeper.multi(ops);
}
catch (zkutil::KeeperException & e)
{
if (e.code == zkutil::ReturnCode::NodeExists)
throw Exception("Replica " + replica_path + " appears to be already active. If you're sure it's not, "
"try again in a minute or remove znode " + replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
throw;
}
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", zookeeper);
}
bool StorageReplicatedMergeTree::isTableEmpty()
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
for (const auto & replica : replicas)
{
if (!zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
return false;
}
return true;
}
void StorageReplicatedMergeTree::checkParts()
{
Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts");
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
MergeTreeData::DataParts parts = data.getDataParts();
MergeTreeData::DataPartsVector unexpected_parts;
for (const auto & part : parts)
{
if (expected_parts.count(part->name))
{
expected_parts.erase(part->name);
}
else
{
unexpected_parts.push_back(part);
}
}
if (!expected_parts.empty())
throw Exception("Not found " + toString(expected_parts.size())
+ " parts (including " + *expected_parts.begin() + ") in table " + data.getTableName(),
ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
if (unexpected_parts.size() > 1)
throw Exception("More than one unexpected part (including " + unexpected_parts[0]->name
+ ") in table " + data.getTableName(),
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
for (MergeTreeData::DataPartPtr part : unexpected_parts)
{
LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name);
data.renameAndDetachPart(part, "ignored_");
}
}
void StorageReplicatedMergeTree::loadQueue()
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
Strings children = zookeeper.getChildren(replica_path + "/queue");
std::sort(children.begin(), children.end());
for (const String & child : children)
{
String s = zookeeper.get(replica_path + "/queue/" + child);
LogEntry entry = LogEntry::parse(s);
entry.znode_name = child;
entry.tagPartsAsCurrentlyMerging(*this);
queue.push_back(entry);
}
}
void StorageReplicatedMergeTree::pullLogsToQueue()
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/// Сольем все логи в хронологическом порядке.
struct LogIterator
{
String replica; /// Имя реплики.
UInt64 index; /// Номер записи в логе (суффикс имени ноды).
Int64 timestamp; /// Время (czxid) создания записи в логе.
String entry_str; /// Сама запись.
bool operator<(const LogIterator & rhs) const
{
return timestamp < rhs.timestamp;
}
bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
{
String index_str = toString(index);
while (index_str.size() < 10)
index_str = '0' + index_str;
zkutil::Stat stat;
if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat))
return false;
timestamp = stat.getczxid();
return true;
}
};
typedef std::priority_queue<LogIterator> PriorityQueue;
PriorityQueue priority_queue;
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
String index_str;
UInt64 index;
if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, index_str))
{
index = parse<UInt64>(index_str);
}
else
{
/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
Strings entries = zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/log");
std::sort(entries.begin(), entries.end());
index = entries.empty() ? 0 : parse<UInt64>(entries[0].substr(strlen("log-")));
zookeeper.create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
}
LogIterator iterator;
iterator.replica = replica;
iterator.index = index;
if (iterator.readEntry(zookeeper, zookeeper_path))
priority_queue.push(iterator);
}
size_t count = 0;
while (!priority_queue.empty())
{
LogIterator iterator = priority_queue.top();
priority_queue.pop();
++count;
LogEntry entry = LogEntry::parse(iterator.entry_str);
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", iterator.entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
auto results = zookeeper.multi(ops);
String path_created = dynamic_cast<zkutil::OpResult::Create &>((*results)[0]).getPathCreated();
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
entry.tagPartsAsCurrentlyMerging(*this);
queue.push_back(entry);
++iterator.index;
if (iterator.readEntry(zookeeper, zookeeper_path))
priority_queue.push(iterator);
}
if (count > 0)
LOG_DEBUG(log, "Pulled " << count << " entries to queue");
}
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{
if (entry.type == LogEntry::MERGE_PARTS)
{
/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
* Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
* Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
* Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
*/
for (const auto & name : entry.parts_to_merge)
{
if (future_parts.count(name))
{
LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " yet because part " << name << " is not ready yet.");
return false;
}
}
}
return true;
}
void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
{
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS)
{
/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
MergeTreeData::DataPartPtr containing_part = data.getContainingPart(entry.new_part_name);
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
if (containing_part && zookeeper.exists(replica_path + "/parts/" + containing_part->name))
{
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
return;
}
}
if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist. This is a bug.");
bool do_fetch = false;
if (entry.type == LogEntry::GET_PART)
{
do_fetch = true;
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
MergeTreeData::DataPartsVector parts;
bool have_all_parts = true;;
for (const String & name : entry.parts_to_merge)
{
MergeTreeData::DataPartPtr part = data.getContainingPart(name);
if (!part)
{
have_all_parts = false;
break;
}
if (part->name != name)
{
LOG_ERROR(log, "Log and parts set look inconsistent: " << name << " is covered by " << part->name
<< " but should be merged into " << entry.new_part_name);
have_all_parts = false;
break;
}
parts.push_back(part);
}
if (!have_all_parts)
{
/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
do_fetch = true;
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
}
else
{
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name,
"",
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums",
part->checksums.toString(),
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
for (const auto & part : parts)
{
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
}
zookeeper.multi(ops);
parts.clear();
data.clearOldParts();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
}
}
else
{
throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
}
if (do_fetch)
{
try
{
String replica = findActiveReplicaHavingPart(entry.new_part_name);
fetchPart(entry.new_part_name, replica);
if (entry.type == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
}
catch (...)
{
/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
* а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
* для этого мерджа в конец очереди.
*
*/
try
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/// Найдем действие по объединению этого куска с другими. Запомним других.
StringSet parts_for_merge;
LogEntries::iterator merge_entry;
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if (it->type == LogEntry::MERGE_PARTS)
{
if (std::find(it->parts_to_merge.begin(), it->parts_to_merge.end(), entry.new_part_name)
!= it->parts_to_merge.end())
{
parts_for_merge = StringSet(it->parts_to_merge.begin(), it->parts_to_merge.end());
merge_entry = it;
break;
}
}
}
if (!parts_for_merge.empty())
{
/// Переместим в конец очереди действия, получающие parts_for_merge.
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
auto it0 = it;
++it;
if (it0 == merge_entry)
break;
if ((it0->type == LogEntry::MERGE_PARTS || it0->type == LogEntry::GET_PART)
&& parts_for_merge.count(it0->new_part_name))
{
queue.splice(queue.end(), queue, it0, it);
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw;
}
}
}
void StorageReplicatedMergeTree::queueUpdatingThread()
{
while (!shutdown_called)
{
try
{
pullLogsToQueue();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
std::this_thread::sleep_for(QUEUE_UPDATE_SLEEP);
}
}
void StorageReplicatedMergeTree::queueThread()
{
while (!shutdown_called)
{
LogEntry entry;
bool have_work = false;
try
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
bool empty = queue.empty();
if (!empty)
{
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if (shouldExecuteLogEntry(*it))
{
entry = *it;
entry.tagPartAsFuture(*this);
queue.erase(it);
have_work = true;
break;
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!have_work)
{
std::this_thread::sleep_for(QUEUE_NO_WORK_SLEEP);
continue;
}
bool success = false;
try
{
executeLogEntry(entry);
auto code = zookeeper.tryRemove(replica_path + "/queue/" + entry.znode_name);
if (code != zkutil::ReturnCode::Ok)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
<< zkutil::ReturnCode::toString(code) + ". There must be a bug somewhere. Ignoring it.");
success = true;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (shutdown_called)
break;
if (success)
{
entry.currently_merging_tagger = nullptr;
std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP);
}
else
{
{
/// Добавим действие, которое не получилось выполнить, в конец очереди.
entry.future_part_tagger = nullptr;
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
queue.push_back(entry);
}
entry.currently_merging_tagger = nullptr;
std::this_thread::sleep_for(QUEUE_ERROR_SLEEP);
}
}
}
void StorageReplicatedMergeTree::mergeSelectingThread()
{
pullLogsToQueue();
while (!shutdown_called && is_leader_node)
{
bool success = false;
try
{
size_t merges_queued = 0;
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
for (const auto & entry : queue)
if (entry.type == LogEntry::MERGE_PARTS)
++merges_queued;
}
if (merges_queued >= data.settings.merging_threads)
{
std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
continue;
}
/// Есть ли активный мердж крупных кусков.
bool has_big_merge = false;
{
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
for (const auto & name : currently_merging)
{
MergeTreeData::DataPartPtr part = data.getContainingPart(name);
if (!part)
continue;
if (part->name != name)
{
LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in" << part->name);
continue;
}
if (part->size * data.index_granularity > 25 * 1024 * 1024)
{
has_big_merge = true;
break;
}
}
}
MergeTreeData::DataPartsVector parts;
{
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
String merged_name;
auto can_merge = std::bind(
&StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
LOG_TRACE(log, "Selecting parts to merge" << (has_big_merge ? " (only small)" : ""));
if (merger.selectPartsToMerge(parts, merged_name, 0, false, false, has_big_merge, can_merge) ||
merger.selectPartsToMerge(parts, merged_name, 0, true, false, has_big_merge, can_merge))
{
LogEntry entry;
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
for (const auto & part : parts)
{
entry.parts_to_merge.push_back(part->name);
}
zookeeper.create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
success = true;
}
}
/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
/// (чтобы куски пометились как currently_merging).
pullLogsToQueue();
for (size_t i = 0; i + 1 < parts.size(); ++i)
{
/// Уберем больше не нужные отметки о несуществующих блоках.
for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/block-" + number_str;
zookeeper.tryRemove(path);
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (shutdown_called)
break;
if (!success)
std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
}
}
bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
if (currently_merging.count(left->name) || currently_merging.count(right->name))
return false;
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/block-" + number_str;
if (AbandonableLockInZooKeeper::check(path, zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
{
LOG_DEBUG(log, "Can't merge parts " << left->name << " and " << right->name << " because block " << path << " exists");
return false;
}
}
return true;
}
void StorageReplicatedMergeTree::becomeLeader()
{
LOG_INFO(log, "Became leader");
is_leader_node = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
}
String StorageReplicatedMergeTree::findActiveReplicaHavingPart(const String & part_name)
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
/// Из реплик, у которых есть кусок, выберем одну равновероятно.
std::random_shuffle(replicas.begin(), replicas.end());
for (const String & replica : replicas)
{
if (zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
return replica;
}
throw Exception("No active replica has part " + part_name, ErrorCodes::NO_REPLICA_HAS_PART);
}
void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_name)
{
LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_name);
auto table_lock = lockStructure(true);
String host;
int port;
String host_port_str = zookeeper.get(zookeeper_path + "/replicas/" + replica_name + "/host");
ReadBufferFromString buf(host_port_str);
assertString("host: ", buf);
readString(host, buf);
assertString("\nport: ", buf);
readText(port, buf);
assertString("\n", buf);
assertEOF(buf);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
auto removed_parts = data.renameTempPartAndReplace(part);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name,
"",
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums",
part->checksums.toString(),
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
for (const auto & removed_part : removed_parts)
{
LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + removed_part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + removed_part->name, -1));
}
zookeeper.multi(ops);
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part");
}
void StorageReplicatedMergeTree::shutdown()
{
if (shutdown_called)
return;
leader_election = nullptr;
shutdown_called = true;
replica_is_active_node = nullptr;
endpoint_holder = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
if (is_leader_node)
merge_selecting_thread.join();
queue_updating_thread.join();
for (auto & thread : queue_threads)
thread.join();
LOG_TRACE(log, "Threads finished");
}
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
try
{
shutdown();
}
catch(...)
{
tryLogCurrentException("~StorageReplicatedMergeTree");
}
}
BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
{
String insert_id;
if (ASTInsertQuery * insert = dynamic_cast<ASTInsertQuery *>(&*query))
insert_id = insert->insert_id;
return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
void StorageReplicatedMergeTree::drop()
{
shutdown();
replica_is_active_node = nullptr;
zookeeper.removeRecursive(replica_path);
if (zookeeper.getChildren(zookeeper_path + "/replicas").empty())
zookeeper.removeRecursive(zookeeper_path);
}
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
writeString("format version: 1\n", out);
writeString("source replica: ", out);
writeString(source_replica, out);
writeString("\n", out);
switch (type)
{
case GET_PART:
writeString("get\n", out);
writeString(new_part_name, out);
break;
case MERGE_PARTS:
writeString("merge\n", out);
for (const String & s : parts_to_merge)
{
writeString(s, out);
writeString("\n", out);
}
writeString("into\n", out);
writeString(new_part_name, out);
break;
}
writeString("\n", out);
}
void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
String type_str;
assertString("format version: 1\n", in);
assertString("source replica: ", in);
readString(source_replica, in);
assertString("\n", in);
readString(type_str, in);
assertString("\n", in);
if (type_str == "get")
{
type = GET_PART;
readString(new_part_name, in);
}
else if (type_str == "merge")
{
type = MERGE_PARTS;
while (true)
{
String s;
readString(s, in);
assertString("\n", in);
if (s == "into")
break;
parts_to_merge.push_back(s);
}
readString(new_part_name, in);
}
assertString("\n", in);
}
}

View File

@ -15,6 +15,10 @@ public:
KeeperException(ReturnCode::type code_)
: DB::Exception(ReturnCode::toString(code_)), code(code_) {}
const char * name() const throw() { return "zkutil::KeeperException"; }
const char * className() const throw() { return "zkutil::KeeperException"; }
KeeperException * clone() const { return new KeeperException(message(), code); }
ReturnCode::type code;
};

View File

@ -0,0 +1,103 @@
#pragma once
#include <zkutil/ZooKeeper.h>
#include <functional>
#include <Yandex/logger_useful.h>
namespace zkutil
{
/** Реализует метод выбора лидера, описанный здесь: http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection
*/
class LeaderElection
{
public:
typedef std::function<void()> LeadershipHandler;
/** handler вызывается, когда этот экземпляр становится лидером.
*/
LeaderElection(const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
: path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_),
shutdown(false), log(&Logger::get("LeaderElection"))
{
node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier);
std::string node_path = node->getPath();
node_name = node_path.substr(node_path.find_last_of('/') + 1);
thread = std::thread(&LeaderElection::threadFunction, this);
}
~LeaderElection()
{
shutdown = true;
thread.join();
}
private:
std::string path;
ZooKeeper & zookeeper;
LeadershipHandler handler;
std::string identifier;
EphemeralNodeHolderPtr node;
std::string node_name;
std::thread thread;
volatile bool shutdown;
Logger * log;
void threadFunction()
{
try
{
while (!shutdown)
{
Strings children = zookeeper.getChildren(path);
std::sort(children.begin(), children.end());
auto it = std::lower_bound(children.begin(), children.end(), node_name);
if (it == children.end() || *it != node_name)
throw Poco::Exception("Assertion failed in LeaderElection");
if (it == children.begin())
{
handler();
return;
}
WatchFuture future;
if (zookeeper.exists(*(it - 1), nullptr, &future))
{
while (!shutdown)
if (future.wait_for(std::chrono::seconds(2)) != std::future_status::timeout)
break;
}
}
}
catch (const DB::Exception & e)
{
LOG_ERROR(log, "Exception in LeaderElection: Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Poco::Exception in LeaderElection: " << e.code() << ". " << e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "std::exception in LeaderElection: " << e.what());
}
catch (...)
{
LOG_ERROR(log, "Unknown exception in LeaderElection");
}
}
};
typedef Poco::SharedPtr<LeaderElection> LeaderElectionPtr;
}

View File

@ -36,6 +36,8 @@ public:
ZooKeeper(const Poco::Util::LayeredConfiguration & config, const std::string & config_name,
WatchFunction * watch = nullptr);
~ZooKeeper();
/** Возвращает true, если сессия навсегда завершена.
* Это возможно только если соединение было установлено, а потом разорвалось. Это достаточно редкая ситуация.
* С другой стороны, если, например, указан неправильный сервер или порт, попытки соединения будут продолжаться бесконечно,
@ -73,7 +75,7 @@ public:
bool exists(const std::string & path, Stat * stat = nullptr, WatchFuture * watch = nullptr);
std::string get(const std::string & path, Stat * stat, WatchFuture * watch);
std::string get(const std::string & path, Stat * stat = nullptr, WatchFuture * watch = nullptr);
/** Не бросает исключение при следующих ошибках:
* - Такой ноды нет. В таком случае возвращает false.
@ -108,19 +110,77 @@ public:
OpResultsPtr multi(const Ops & ops);
/** Бросает исключение только если какая-нибудь операция вернула "неожиданную" ошибку - такую ошибку,
* увидев которую соответствующий метод try* бросил бы исключение. */
* увидев которую соответствующий метод try* бросил бы исключение. */
OpResultsPtr tryMulti(const Ops & ops);
/** Удаляет ноду вместе с поддеревом. Если в это время кто-то добавит иили удалит ноду в поддереве, результат не определен.
*/
void removeRecursive(const std::string & path);
private:
void init(const std::string & hosts, int32_t sessionTimeoutMs, WatchFunction * watch_);
friend struct StateWatch;
zk::ZooKeeper impl;
ACLs default_acl;
WatchFunction * state_watch;
Poco::FastMutex mutex;
ACLs default_acl;
SessionState::type session_state;
void stateChanged(WatchEvent::type event, SessionState::type state, const std::string& path);
};
/** В конструкторе создает эфемерную ноду, в деструкторе - удаляет.
*/
class EphemeralNodeHolder
{
public:
typedef Poco::SharedPtr<EphemeralNodeHolder> Ptr;
EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data)
: path(path_), zookeeper(zookeeper_)
{
if (create)
path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
}
std::string getPath() const
{
return path;
}
static Ptr create(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
return new EphemeralNodeHolder(path, zookeeper, true, false, data);
}
static Ptr createSequential(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
return new EphemeralNodeHolder(path, zookeeper, true, true, data);
}
static Ptr existing(const std::string & path, ZooKeeper & zookeeper)
{
return new EphemeralNodeHolder(path, zookeeper, false, false, "");
}
~EphemeralNodeHolder()
{
try
{
zookeeper.tryRemove(path);
}
catch (KeeperException) {}
}
private:
std::string path;
ZooKeeper & zookeeper;
};
typedef EphemeralNodeHolder::Ptr EphemeralNodeHolderPtr;
}

View File

@ -1,5 +1,6 @@
#include <zkutil/ZooKeeper.h>
#include <boost/make_shared.hpp>
#include <Yandex/logger_useful.h>
#define CHECKED(x) { ReturnCode::type code = x; if (code != ReturnCode::Ok) throw KeeperException(code); }
@ -68,7 +69,6 @@ struct ZooKeeperArgs
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
std::string node_key = "node";
std::string node_key_ext = "node[";
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
for (const auto & key : keys)
@ -76,7 +76,7 @@ struct ZooKeeperArgs
if (key == node_key || key.compare(0, node_key.size(), node_key) == 0)
{
if (hosts.size())
hosts += std::string(" ");
hosts += std::string(",");
hosts += config.getString(config_name + "." + key + ".host") + ":" + config.getString(config_name + "." + key + ".port");
}
else if (key == "session_timeout_ms")
@ -100,6 +100,8 @@ ZooKeeper::ZooKeeper(const Poco::Util::LayeredConfiguration & config, const std:
void ZooKeeper::stateChanged(WatchEvent::type event, SessionState::type state, const std::string & path)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
session_state = state;
if (state_watch)
(*state_watch)(event, state, path);
@ -107,16 +109,22 @@ void ZooKeeper::stateChanged(WatchEvent::type event, SessionState::type state, c
bool ZooKeeper::disconnected()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return session_state == SessionState::Expired || session_state == SessionState::AuthFailed;
}
void ZooKeeper::setDefaultACL(ACLs & acl)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
default_acl = acl;
}
ACLs ZooKeeper::getDefaultACL()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return default_acl;
}
@ -148,6 +156,8 @@ bool ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
std::string ZooKeeper::create(const std::string & path, const std::string & data, CreateMode::type mode)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::string res;
CHECKED(impl.create(path, data, default_acl, mode, res));
return res;
@ -155,6 +165,8 @@ std::string ZooKeeper::create(const std::string & path, const std::string & data
ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::string & data, CreateMode::type mode, std::string & pathCreated)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
ReturnCode::type code = impl.create(path, data, default_acl, mode, pathCreated);
if (!( code == ReturnCode::Ok ||
code == ReturnCode::NoNode ||
@ -291,9 +303,29 @@ OpResultsPtr ZooKeeper::tryMulti(const Ops & ops)
return res;
}
void ZooKeeper::removeRecursive(const std::string & path)
{
Strings children = getChildren(path);
for (const std::string & child : children)
removeRecursive(path + "/" + child);
remove(path);
}
void ZooKeeper::close()
{
CHECKED(impl.close());
}
ZooKeeper::~ZooKeeper()
{
try
{
close();
}
catch(...)
{
LOG_ERROR(&Logger::get("~ZooKeeper"), "Failed to close ZooKeeper session");
}
}
}