Save cluster/distributed/table to log

This commit is contained in:
Frank Chen 2022-09-05 16:39:47 +08:00
parent 3d65e3f2ee
commit a17bc51d5b
2 changed files with 18 additions and 13 deletions

View File

@ -338,7 +338,11 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", rows);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", [rows]() { return std::to_string(rows); });
span.addAttribute("clickhouse.bytes", [&shard_block]() { return std::to_string(shard_block.bytes()); });
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
@ -610,20 +614,15 @@ void DistributedSink::writeSplitAsync(const Block & block)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
OpenTelemetry::SpanHolder span("DistributedSink::writeAsyncImpl()");
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
Block block_to_send = removeSuperfluousColumns(block);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", block.rows());
if (shard_info.hasInternalReplication())
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
/// Prefer insert into current instance directly
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
else
{
const auto & path = shard_info.insertPathForInternalReplication(
@ -631,13 +630,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
settings.use_compact_format_in_distributed_parts_names);
if (path.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
writeToShard(block_to_send, {path});
writeToShard(shard_info, block_to_send, {path});
}
}
else
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
@ -645,7 +644,7 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
if (!dir_names.empty())
writeToShard(block_to_send, dir_names);
writeToShard(shard_info, block_to_send, dir_names);
}
}
@ -666,9 +665,10 @@ void DistributedSink::writeToLocal(const Block & block, size_t repeats)
}
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
void DistributedSink::writeToShard(const Cluster::ShardInfo& shard_info, const Block & block, const std::vector<std::string> & dir_names)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
@ -759,6 +759,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
header_stream.write(block.cloneEmpty());
}
writeVarUInt(shard_info.shard_num, header_buf);
writeStringBinary(this->storage.cluster_name, header_buf);
writeStringBinary(this->storage.getStorageID().getFullNameNotQuoted(), header_buf);
writeStringBinary(this->storage.remote_database + "." + this->storage.remote_table, header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// And note that it is safe, because we have checksum and size for header.

View File

@ -69,9 +69,9 @@ private:
Block removeSuperfluousColumns(Block block) const;
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, size_t repeats);
void writeToLocal(const Cluster::ShardInfo& shard_info, const Block & block, size_t repeats);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
void writeToShard(const Cluster::ShardInfo& shard_info, const Block & block, const std::vector<std::string> & dir_names);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.