im-memory parts: replication

This commit is contained in:
Anton Popov 2020-04-29 20:14:49 +03:00
parent 1789d6fa82
commit 42997bce86
10 changed files with 178 additions and 70 deletions

View File

@ -1,6 +1,9 @@
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <IO/HTTPCommon.h>
#include <ext/scope_guard.h>
#include <Poco/File.h>
@ -54,6 +57,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
int client_protocol_version = parse<int>(params.get("client_protocol_version", "0"));
String part_name = params.get("part");
String part_type = params.get("part_type", "Wide"); // TODO: correct type with old versions
const auto data_settings = data.getSettings();
@ -84,24 +88,16 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
LOG_TRACE(log, "Sending part " << part_name);
try
{
{
auto storage_lock = data.lockStructureForShare(
false, RWLockImpl::NO_QUERY, data.getSettings()->lock_acquire_timeout_for_background_operations);
false, RWLockImpl::NO_QUERY, data.getSettings()->lock_acquire_timeout_for_background_operations);
MergeTreeData::DataPartPtr part = findPart(part_name);
auto part = findPart(part_name);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
/// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Add files that are not in the checksum list.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
MergeTreeData::DataPart::Checksums data_checksums;
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
writeBinary(checksums.getTotalSizeOnDisk(), out);
writeBinary(part->checksums.getTotalSizeOnDisk(), out);
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
@ -110,37 +106,10 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
writeBinary(ttl_infos_buffer.str(), out);
}
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
auto disk = part->disk;
String path = part->getFullRelativePath() + file_name;
UInt64 size = disk->getFileSize(path);
writeStringBinary(it.first, out);
writeBinary(size, out);
auto file_in = disk->readFile(path);
HashingWriteBuffer hashing_out(out);
copyData(*file_in, hashing_out, blocker.getCounter());
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writePODBinary(hashing_out.getHash(), out);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
}
part->checksums.checkEqual(data_checksums, false);
if (part_type == "InMemory")
sendPartFromMemory(part, out, storage_lock);
else
sendPartFromDisk(part, out, storage_lock);
}
catch (const NetException &)
{
@ -160,6 +129,61 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
}
}
void Service::sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, TableStructureReadLockHolder &)
{
auto part_in_memory = dynamic_cast<const MergeTreeDataPartInMemory *>(part.get());
if (!part_in_memory)
throw Exception("Part " + part->name + " is not stored in memory", ErrorCodes::NO_SUCH_DATA_PART); // TODO error code
NativeBlockOutputStream block_out(out, 0, data.getSampleBlock());
block_out.write(part_in_memory->block);
// TODO send checksums
}
void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, TableStructureReadLockHolder &)
{
/// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Add files that are not in the checksum list.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
MergeTreeData::DataPart::Checksums data_checksums;
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
auto disk = part->disk;
String path = part->getFullRelativePath() + file_name;
UInt64 size = disk->getFileSize(path);
writeStringBinary(it.first, out);
writeBinary(size, out);
auto file_in = disk->readFile(path);
HashingWriteBuffer hashing_out(out);
copyData(*file_in, hashing_out, blocker.getCounter());
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writePODBinary(hashing_out.getHash(), out);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
}
part->checksums.checkEqual(data_checksums, false);
}
MergeTreeData::DataPartPtr Service::findPart(const String & name)
{
/// It is important to include PreCommitted and Outdated parts here because remote replicas cannot reliably
@ -174,6 +198,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const String & part_name,
const String & part_type,
const String & replica_path,
const String & host,
int port,
@ -196,6 +221,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"part_type", part_type},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)},
{"compress", "false"}
});
@ -244,10 +270,48 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
reservation = data.makeEmptyReservationOnLargestDisk();
}
return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in);
return part_type == "InMemory" ? downloadPartToMemory(part_name, replica_path, in)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in);
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const String & part_name,
const String & /* replica_path */,
PooledReadWriteBufferFromHTTP & in)
{
NativeBlockInputStream block_in(in, 0);
auto block = block_in.read();
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, nullptr);
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());
new_data_part->minmax_idx.update(block, data.minmax_idx_columns);
auto partition_block = block;
data.partition_key_expr->execute(partition_block);
auto & partition = new_data_part->partition.value;
size_t partition_columns_num = data.partition_key_sample.columns();
partition.resize(partition_columns_num);
for (size_t i = 0; i < partition_columns_num; ++i)
{
const auto & column_name = data.partition_key_sample.getByPosition(i).name;
const auto & partition_column = partition_block.getByName(column_name).column;
partition[i] = (*partition_column)[0];
}
MergedBlockOutputStream part_out(new_data_part, block.getNamesAndTypesList(), {}, nullptr);
part_out.writePrefix();
part_out.write(block);
part_out.writeSuffixAndFinalizePart(new_data_part);
// TODO validate checksums
return new_data_part;
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const String & part_name,
const String & replica_path,
bool to_detached,

View File

@ -31,6 +31,8 @@ public:
private:
MergeTreeData::DataPartPtr findPart(const String & name);
void sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, TableStructureReadLockHolder & storage_lock);
void sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, TableStructureReadLockHolder & storage_lock);
private:
/// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish,
@ -52,6 +54,7 @@ public:
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchPart(
const String & part_name,
const String & part_type,
const String & replica_path,
const String & host,
int port,
@ -66,7 +69,7 @@ public:
ActionBlocker blocker;
private:
MergeTreeData::MutableDataPartPtr downloadPart(
MergeTreeData::MutableDataPartPtr downloadPartToDisk(
const String & part_name,
const String & replica_path,
bool to_detached,
@ -74,6 +77,11 @@ private:
const ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in);
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
const String & part_name,
const String & replica_path,
PooledReadWriteBufferFromHTTP & in);
MergeTreeData & data;
Logger * log;
};

View File

@ -222,6 +222,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
/// Write index. The index contains Primary Key value for each `index_granularity` row.
while (current_mark < )
for (size_t i = index_offset; i < rows;)
{
if (storage.hasPrimaryKey())

View File

@ -60,7 +60,7 @@ void MergeTreeWriteAheadLog::rotate(const std::lock_guard<std::mutex> & /*write_
out = disk->writeFile(path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append);
block_out = std::make_unique<NativeBlockOutputStream>(*out, 0, storage.getSampleBlock());
min_block_number = std::numeric_limits<Int64>::max();
max_block_number = 0;
max_block_number = std::numeric_limits<Int64>::min();
}
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()

View File

@ -48,7 +48,7 @@ private:
std::unique_ptr<NativeBlockOutputStream> block_out;
Int64 min_block_number = std::numeric_limits<Int64>::max();
Int64 max_block_number = 0;
Int64 max_block_number = std::numeric_limits<Int64>::min();
mutable std::mutex write_mutex;
};

View File

@ -239,6 +239,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
log_entry.new_part_name = part_name;
log_entry.quorum = quorum;
log_entry.block_id = block_id;
log_entry.new_part_type = part->getType();
/// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.

View File

@ -1400,7 +1400,8 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
try
{
String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
if (!fetchPart(part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
String part_type = entry.new_part_type.toString();
if (!fetchPart(part_name, part_type, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
return false;
}
catch (Exception & e)
@ -1744,7 +1745,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
part_desc->res_part = fetcher.fetchPart(part_desc->found_new_part_name, source_replica_path,
part_desc->res_part = fetcher.fetchPart(part_desc->found_new_part_name, "Wide", source_replica_path, // TODO: fix part type
address.host, address.replication_port, timeouts, user, password, interserver_scheme, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@ -2693,7 +2694,8 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
}
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & source_replica_path, bool to_detached, size_t quorum)
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & part_type,
const String & source_replica_path, bool to_detached, size_t quorum)
{
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
@ -2798,7 +2800,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
ErrorCodes::LOGICAL_ERROR);
return fetcher.fetchPart(
part_name, source_replica_path,
part_name, part_type, source_replica_path,
address.host, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, to_detached);
};
@ -4305,7 +4307,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
{
try
{
fetchPart(part, best_replica_path, true, 0);
fetchPart(part, "Wide", best_replica_path, true, 0); // TODO: fix part type
}
catch (const DB::Exception & e)
{

View File

@ -454,7 +454,7 @@ private:
* If quorum != 0, then the node for tracking the quorum is updated.
* Returns false if part is already fetching right now.
*/
bool fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum);
bool fetchPart(const String & part_name, const String & part_type, const String & replica_path, bool to_detached, size_t quorum);
/// Required only to avoid races between executeLogEntry and fetchPartition
std::unordered_set<String> currently_fetching_parts;

View File

@ -19,7 +19,7 @@ StorageSystemParts::StorageSystemParts(const std::string & name_)
{
{"partition", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"part_type", std::make_shared<DataTypeString>()},
{"part_type", std::make_shared<DataTypeString>()},
{"active", std::make_shared<DataTypeUInt8>()},
{"marks", std::make_shared<DataTypeUInt64>()},
{"rows", std::make_shared<DataTypeUInt64>()},
@ -111,8 +111,16 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
columns_[i++]->insert(info.database);
columns_[i++]->insert(info.table);
columns_[i++]->insert(info.engine);
columns_[i++]->insert(part->disk->getName());
columns_[i++]->insert(part->getFullPath());
if (part->isStoredOnDisk())
{
columns_[i++]->insert(part->disk->getName());
columns_[i++]->insert(part->getFullPath());
}
else
{
columns_[i++]->insertDefault();
columns_[i++]->insertDefault();
}
if (has_state_column)
columns_[i++]->insert(part->stateString());

View File

@ -36,8 +36,8 @@ def create_tables(name, nodes, node_settings, shard):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/{name}', '{repl}')
PARTITION BY toYYYYMM(date)
ORDER BY id
SETTINGS index_granularity = {index_granularity}, index_granularity_bytes = {index_granularity_bytes},
min_rows_for_wide_part = {min_rows_for_wide_part}, min_bytes_for_wide_part = {min_bytes_for_wide_part}
SETTINGS index_granularity = 64, index_granularity_bytes = {index_granularity_bytes},
min_rows_for_wide_part = {min_rows_for_wide_part}, min_rows_for_compact_part = {min_rows_for_compact_part}
'''.format(name=name, shard=shard, repl=i, **settings))
def create_tables_old_format(name, nodes, shard):
@ -51,19 +51,24 @@ def create_tables_old_format(name, nodes, shard):
node1 = cluster.add_instance('node1', config_dir="configs", with_zookeeper=True)
node2 = cluster.add_instance('node2', config_dir="configs", with_zookeeper=True)
settings_default = {'index_granularity' : 64, 'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_bytes_for_wide_part' : 0}
settings_compact_only = {'index_granularity' : 64, 'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 1000000, 'min_bytes_for_wide_part' : 0}
settings_not_adaptive = {'index_granularity' : 64, 'index_granularity_bytes' : 0, 'min_rows_for_wide_part' : 512, 'min_bytes_for_wide_part' : 0}
settings_default = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_rows_for_compact_part' : 0}
settings_compact_only = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 1000000, 'min_rows_for_compact_part' : 0}
settings_not_adaptive = {'index_granularity_bytes' : 0, 'min_rows_for_wide_part' : 512, 'min_rows_for_compact_part' : 0}
node3 = cluster.add_instance('node3', config_dir="configs", with_zookeeper=True)
node4 = cluster.add_instance('node4', config_dir="configs", main_configs=['configs/no_leader.xml'], with_zookeeper=True)
settings_compact = {'index_granularity' : 64, 'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_bytes_for_wide_part' : 0}
settings_wide = {'index_granularity' : 64, 'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 0, 'min_bytes_for_wide_part' : 0}
settings_compact = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_rows_for_compact_part' : 0}
settings_wide = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 0, 'min_rows_for_compact_part' : 0}
node5 = cluster.add_instance('node5', config_dir='configs', main_configs=['configs/compact_parts.xml'], with_zookeeper=True)
node6 = cluster.add_instance('node6', config_dir='configs', main_configs=['configs/compact_parts.xml'], with_zookeeper=True)
settings_in_memory = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_rows_for_compact_part' : 256}
node9 = cluster.add_instance('node9', config_dir="configs", with_zookeeper=True)
node10 = cluster.add_instance('node10', config_dir="configs", with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
@ -75,6 +80,7 @@ def start_cluster():
create_tables('polymorphic_table_compact', [node3, node4], [settings_compact, settings_wide], "shard2")
create_tables('polymorphic_table_wide', [node3, node4], [settings_wide, settings_compact], "shard2")
create_tables_old_format('polymorphic_table', [node5, node6], "shard3")
create_tables('in_memory_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4")
yield cluster
@ -84,8 +90,8 @@ def start_cluster():
@pytest.mark.parametrize(
('first_node', 'second_node'),
[
(node1, node2),
(node5, node6)
(node1, node2), # compact parts
(node5, node6), # compact parts, old-format
]
)
def test_polymorphic_parts_basics(start_cluster, first_node, second_node):
@ -198,8 +204,8 @@ def test_different_part_types_on_replicas(start_cluster, table, part_type):
node7 = cluster.add_instance('node7', config_dir="configs", with_zookeeper=True, image='yandex/clickhouse-server:19.17.8.54', stay_alive=True, with_installed_binary=True)
node8 = cluster.add_instance('node8', config_dir="configs", with_zookeeper=True)
settings7 = {'index_granularity' : 64, 'index_granularity_bytes' : 10485760}
settings8 = {'index_granularity' : 64, 'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_bytes_for_wide_part' : 0}
settings7 = {'index_granularity_bytes' : 10485760}
settings8 = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_rows_for_compact_part' : 0}
@pytest.fixture(scope="module")
def start_cluster_diff_versions():
@ -212,7 +218,7 @@ def start_cluster_diff_versions():
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/shard5/{name}', '1')
PARTITION BY toYYYYMM(date)
ORDER BY id
SETTINGS index_granularity = {index_granularity}, index_granularity_bytes = {index_granularity_bytes}
SETTINGS index_granularity = 64, index_granularity_bytes = {index_granularity_bytes}
'''.format(name=name, **settings7)
)
@ -222,7 +228,7 @@ def start_cluster_diff_versions():
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/shard5/{name}', '2')
PARTITION BY toYYYYMM(date)
ORDER BY id
SETTINGS index_granularity = {index_granularity}, index_granularity_bytes = {index_granularity_bytes},
SETTINGS index_granularity = 64, index_granularity_bytes = {index_granularity_bytes},
min_rows_for_wide_part = {min_rows_for_wide_part}, min_bytes_for_wide_part = {min_bytes_for_wide_part}
'''.format(name=name, **settings8)
)
@ -287,3 +293,21 @@ def test_polymorphic_parts_non_adaptive(start_cluster):
"WHERE table = 'non_adaptive_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV("Wide\t2\n")
assert node1.contains_in_log("<Warning> default.non_adaptive_table: Table can't create parts with adaptive granularity")
def test_in_memory(start_cluster):
node9.query("SYSTEM STOP MERGES")
node10.query("SYSTEM STOP MERGES")
for size in [200, 200, 300, 600]:
insert_random_data('in_memory_table', node9, size)
node10.query("SYSTEM SYNC REPLICA in_memory_table", timeout=20)
assert node9.query("SELECT count() FROM in_memory_table") == "1300\n"
assert node10.query("SELECT count() FROM in_memory_table") == "1300\n"
expected = "Compact\t1\nInMemory\t2\nWide\t1\n"
assert TSV(node9.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(expected)
assert TSV(node10.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(expected)