This commit is contained in:
Michael Kolupaev 2014-04-02 17:45:39 +04:00
parent 83fce6973a
commit b92f1ff480
15 changed files with 249 additions and 83 deletions

View File

@ -239,6 +239,7 @@ namespace ErrorCodes
NOT_FOUND_EXPECTED_DATA_PART,
TOO_MANY_UNEXPECTED_DATA_PARTS,
NO_SUCH_DATA_PART,
BAD_DATA_PART_NAME,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -24,32 +24,42 @@ class ReadBufferFromHTTP : public ReadBuffer
private:
std::string host;
int port;
std::string params;
Poco::Net::HTTPClientSession session;
std::istream * istr; /// этим владеет session
Poco::SharedPtr<ReadBufferFromIStream> impl;
public:
typedef std::vector<std::pair<String, String> > Params;
ReadBufferFromHTTP(
const std::string & host_,
int port_,
const std::string & params_,
const Params & params,
size_t timeout_ = 0,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(NULL, 0), host(host_), port(port_), params(params_)
: ReadBuffer(NULL, 0), host(host_), port(port_)
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
std::stringstream uri;
uri << "http://" << host << ":" << port << "/?" << params;
uri << "http://" << host << ":" << port << "/";
bool first = true;
for (const auto & it : params)
{
uri << (first ? "?" : "&");
first = false;
String encoded_key;
String encoded_value;
Poco::URI::encode(it.first, "=&#", encoded_key);
Poco::URI::encode(it.second, "&#", encoded_value);
uri << encoded_key << "=" << encoded_value;
}
session.setHost(host);
session.setPort(port);
/// устанавливаем таймаут
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0));
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0));
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str());
Poco::Net::HTTPResponse response;

View File

@ -18,6 +18,7 @@
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/VarInt.h>
#include <city.h>
#define DEFAULT_MAX_STRING_SIZE 0x00FFFFFFULL
@ -417,6 +418,7 @@ inline void readBinary(Float32 & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(Float64 & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(String & x, ReadBuffer & buf) { readStringBinary(x, buf); }
inline void readBinary(bool & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(uint128 & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(VisitID_t & x, ReadBuffer & buf) { readPODBinary(x, buf); }
inline void readBinary(mysqlxx::Date & x, ReadBuffer & buf) { readPODBinary(x, buf); }

View File

@ -23,13 +23,12 @@ public:
size_t timeout = 0,
size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE)
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
std::stringstream params;
params << "action=read&path=" << encoded_path << "&compress=" << (compress ? "true" : "false");
ReadBufferFromHTTP::Params params = {
std::make_pair("action", "read"),
std::make_pair("path", path),
std::make_pair("compress", (compress ? "true" : "false"))};
impl = new ReadBufferFromHTTP(host, port, params.str, timeout, buffer_size);
impl = new ReadBufferFromHTTP(host, port, params, timeout, buffer_size);
}
bool nextImpl()
@ -48,13 +47,11 @@ public:
const std::string & path,
size_t timeout = 0)
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
ReadBufferFromHTTP::Params params = {
std::make_pair("action", "list"),
std::make_pair("path", path)};
std::stringstream params;
params << "action=list&path=" << encoded_path;
ReadBufferFromHTTP in(host, port, params.str(), timeout);
ReadBufferFromHTTP in(host, port, params, timeout);
std::vector<std::string> files;
while (!in.eof())

View File

@ -18,6 +18,7 @@
#include <DB/IO/WriteIntText.h>
#include <DB/IO/VarInt.h>
#include <DB/IO/WriteBufferFromString.h>
#include <city.h>
#define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6U
@ -448,6 +449,7 @@ inline void writeBinary(const Float32 & x, WriteBuffer & buf) { writePODBinary(
inline void writeBinary(const Float64 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const String & x, WriteBuffer & buf) { writeStringBinary(x, buf); }
inline void writeBinary(const bool & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const uint128 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const VisitID_t & x, WriteBuffer & buf) { writePODBinary(static_cast<const UInt64 &>(x), buf); }
inline void writeBinary(const mysqlxx::Date & x, WriteBuffer & buf) { writePODBinary(x, buf); }

View File

@ -12,6 +12,10 @@ namespace DB
*/
void copyData(ReadBuffer & from, WriteBuffer & to);
/** Копирует bytes байт из ReadBuffer в WriteBuffer
*/
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes);
}
#endif

View File

@ -8,8 +8,13 @@
#include <DB/Interpreters/ExpressionActions.h>
#include <DB/Storages/IStorage.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/Common/escapeForFileName.h>
#include <Poco/RWLock.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
namespace DB
{
@ -147,7 +152,7 @@ public:
}
};
DataPart(MergeTreeData & storage_) : storage(storage_), size_in_bytes(0) {}
DataPart(MergeTreeData & storage_) : storage(storage_), size(0), size_in_bytes(0) {}
MergeTreeData & storage;
DayNum_t left_date;
@ -237,14 +242,20 @@ public:
&& right >= rhs.right;
}
/// Загрузить индекс и вычислить размер.
/// Загрузить индекс и вычислить размер. Если size=0, вычислить его тоже.
void loadIndex()
{
/// Размер - в количестве засечек.
if (!size)
size = Poco::File(storage.full_path + name + "/" + escapeForFileName(storage.columns->front().first) + ".mrk")
.getSize() / MERGE_TREE_MARK_SIZE;
size_t key_size = storage.sort_descr.size();
index.resize(key_size * size);
String index_path = storage.full_path + name + "/primary.idx";
ReadBufferFromFile index_file(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
ReadBufferFromFile index_file(index_path,
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
for (size_t i = 0; i < size; ++i)
for (size_t j = 0; j < key_size; ++j)
@ -318,7 +329,7 @@ public:
bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) const;
/// Кладет в DataPart данные из имени кусочка.
void parsePartName(const String & file_name, const Poco::RegularExpression::MatchVec & matches, DataPart & part);
void parsePartName(const String & file_name, DataPart & part, const Poco::RegularExpression::MatchVec * matches = nullptr);
std::string getTableName() { return ""; }

View File

@ -12,9 +12,6 @@
#include <DB/Columns/ColumnNested.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
namespace DB
{

View File

@ -11,7 +11,7 @@ class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_)
: storage(storage_), insert_id(insert_id_), block_index(0) {}
: storage(storage_), insert_id(insert_id_), block_index(0), log(&Logger::get("ReplicatedMergeTreeBlockOutputStream")) {}
void write(const Block & block) override
{
@ -33,6 +33,8 @@ public:
if (!block_id.empty() && storage.zookeeper.tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
LOG_INFO(log, "Block with this ID already exists; ignoring it");
/// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять.
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
expected_checksums.check(part->checksums);
@ -49,28 +51,38 @@ public:
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.new_part_name = part->name;
String checksums_str = part->checksums.toString();
/// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id,
"",
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number),
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
if (!block_id.empty())
{
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id,
"",
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
checksums_str,
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number),
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
}
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/parts/" + part->name,
"",
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/parts/" + part->name + "/checksums",
checksums_str,
storage.zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/log/log-",
log_entry.toString(),
@ -86,6 +98,8 @@ private:
StorageReplicatedMergeTree & storage;
String insert_id;
size_t block_index;
Logger * log;
};
}

View File

@ -0,0 +1,131 @@
#pragma once
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/HashingWriteBuffer.h>
#include <DB/IO/copyData.h>
namespace DB
{
class ReplicatedMergeTreePartsServer : public InterserverIOEndpoint
{
public:
ReplicatedMergeTreePartsServer(MergeTreeData & data_, StoragePtr owned_storage_) : data(data_),
owned_storage(owned_storage_), log(&Logger::get("ReplicatedMergeTreePartsServer")) {}
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override
{
String part_name = params.get("part");
LOG_TRACE(log, "Sending part " << part_name);
auto storage_lock = owned_storage->lockStructure(false);
MergeTreeData::DataPartPtr part = findPart(part_name);
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
checksums.files["checksums.txt"];
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String path = data.getFullPath() + part_name + "/" + it.first;
UInt64 size = Poco::File(path).getSize();
writeStringBinary(it.first, out);
writeBinary(size, out);
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out);
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writeBinary(hashing_out.getHash(), out);
}
}
private:
MergeTreeData & data;
StoragePtr owned_storage;
Logger * log;
MergeTreeData::DataPartPtr findPart(const String & name)
{
MergeTreeData::DataParts parts = data.getDataParts();
for (const auto & part : parts)
{
if (part->name == name)
return part;
}
throw Exception("No part " + name + " in table");
}
};
class ReplicatedMergeTreePartsFetcher
{
public:
ReplicatedMergeTreePartsFetcher(MergeTreeData & data_) : data(data_), log(&Logger::get("ReplicatedMergeTreePartsFetcher")) {}
/// Скачивает кусок в tmp_директорию, проверяет чексуммы.
MergeTreeData::MutableDataPartPtr fetchPart(
const String & part_name,
const String & replica_path,
const String & host,
int port)
{
LOG_TRACE(log, "Fetching part " << part_name);
ReadBufferFromHTTP::Params params = {
std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path),
std::make_pair("part", part_name)};
ReadBufferFromHTTP in(host, port, params);
String part_path = data.getFullPath() + "tmp_" + part_name + "/";
if (!Poco::File(part_path).createDirectory())
throw Exception("Directory " + part_path + " already exists");
size_t files;
readBinary(files, in);
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
WriteBufferFromFile file_out(part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(in, hashing_out, file_size);
uint128 expected_hash;
readBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
}
assertEOF(in);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
data.parsePartName(part_name, *new_data_part);
new_data_part->name = "tmp_" + part_name;
new_data_part->modification_time = time(0);
new_data_part->loadIndex();
new_data_part->loadChecksums();
return new_data_part;
}
private:
MergeTreeData & data;
Logger * log;
};
}

View File

@ -103,23 +103,6 @@ private:
typedef std::list<LogEntry> LogEntries;
class MyInterserverIOEndpoint : public InterserverIOEndpoint
{
public:
MyInterserverIOEndpoint(StorageReplicatedMergeTree & storage_) : storage(storage_) {}
void setOwnedStorage(StoragePtr owned_storage_)
{
owned_storage = owned_storage_;
}
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
private:
StorageReplicatedMergeTree & storage;
StoragePtr owned_storage;
};
Context & context;
zkutil::ZooKeeper & zookeeper;

View File

@ -14,5 +14,19 @@ void copyData(ReadBuffer & from, WriteBuffer & to)
from.position() = from.buffer().end();
}
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
{
while (bytes > 0 && !from.eof())
{
size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
to.write(from.position(), count);
from.position() += count;
bytes -= count;
}
if (bytes > 0)
throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
}

View File

@ -2,7 +2,6 @@
#include <Yandex/time2str.h>
#include <Poco/Ext/ScopedTry.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Storages/MergeTree/MergeTreeReader.h>
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
@ -120,8 +119,18 @@ String MergeTreeData::getPartName(DayNum_t left_date, DayNum_t right_date, UInt6
}
void MergeTreeData::parsePartName(const String & file_name, const Poco::RegularExpression::MatchVec & matches, DataPart & part)
void MergeTreeData::parsePartName(const String & file_name, DataPart & part, const Poco::RegularExpression::MatchVec * matches_p)
{
Poco::RegularExpression::MatchVec match_vec;
if (!matches_p)
{
if (!isPartDirectory(file_name, match_vec))
throw Exception("Unexpected part name: " + file_name, ErrorCodes::BAD_DATA_PART_NAME);
matches_p = &match_vec;
}
const Poco::RegularExpression::MatchVec & matches = *matches_p;
DateLUTSingleton & date_lut = DateLUTSingleton::instance();
part.left_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length)));
@ -181,7 +190,7 @@ void MergeTreeData::loadDataParts()
continue;
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
parsePartName(file_name, matches, *part);
parsePartName(file_name, *part, &matches);
part->name = file_name;
/// Для битых кусков, которые могут образовываться после грубого перезапуска сервера, попытаться восстановить куски, из которых они сделаны.
@ -202,10 +211,6 @@ void MergeTreeData::loadDataParts()
continue;
}
/// Размер - в количестве засечек.
part->size = Poco::File(full_path + file_name + "/" + escapeForFileName(columns->front().first) + ".mrk").getSize()
/ MERGE_TREE_MARK_SIZE;
part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();
try
@ -618,9 +623,8 @@ Strings MergeTreeData::tryRestorePart(const String & path, const String & file_n
Poco::RegularExpression::MatchVec matches;
Strings restored_parts;
isPartDirectory(file_name, matches);
DataPart broken_part(*this);
parsePartName(file_name, matches, broken_part);
parsePartName(file_name, broken_part);
for (int i = static_cast<int>(old_parts.size()) - 1; i >= 0; --i)
{
@ -632,7 +636,7 @@ Strings MergeTreeData::tryRestorePart(const String & path, const String & file_n
old_parts.erase(old_parts.begin() + i);
continue;
}
parsePartName(name, matches, old_part);
parsePartName(name, old_part, &matches);
if (broken_part.contains(old_part))
{
/// Восстанавливаем все содержащиеся куски. Если некоторые из них содержатся в других, их удалит loadDataParts.

View File

@ -1,5 +1,6 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
@ -7,14 +8,6 @@
namespace DB
{
void StorageReplicatedMergeTree::MyInterserverIOEndpoint::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
writeString("Hello. You requested part ", out);
writeString(params.get("part"), out);
writeString(".", out);
}
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
@ -59,10 +52,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
checkParts();
}
String endpoint_name = "ReplicatedMergeTree:" + replica_path;
InterserverIOEndpointPtr endpoint = new MyInterserverIOEndpoint(*this);
endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, context.getInterserverIOHandler());
activateReplica();
}
@ -84,7 +73,9 @@ StoragePtr StorageReplicatedMergeTree::create(
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
dynamic_cast<MyInterserverIOEndpoint &>(*res->endpoint_holder->getEndpoint()).setOwnedStorage(res_ptr);
String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
return res_ptr;
}
@ -127,6 +118,7 @@ void StorageReplicatedMergeTree::createTable()
zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/block-numbers", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
}
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).

View File

@ -15,6 +15,10 @@ public:
KeeperException(ReturnCode::type code_)
: DB::Exception(ReturnCode::toString(code_)), code(code_) {}
const char * name() const throw() { return "zkutil::KeeperException"; }
const char * className() const throw() { return "zkutil::KeeperException"; }
KeeperException * clone() const { return new KeeperException(message(), code); }
ReturnCode::type code;
};