Merge branch 'master' into update-delete

This commit is contained in:
Dan Roscigno 2022-09-13 15:20:01 -04:00 committed by GitHub
commit 69e898c2ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 423 additions and 60 deletions

View File

@ -220,6 +220,35 @@ ReplxxLineReader::ReplxxLineReader(
rx.bind_key(Replxx::KEY::control('W'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_WHITESPACE_ON_LEFT, code); });
rx.bind_key(Replxx::KEY::meta('E'), [this](char32_t) { openEditor(); return Replxx::ACTION_RESULT::CONTINUE; });
/// readline insert-comment
auto insert_comment_action = [this](char32_t code)
{
replxx::Replxx::State state(rx.get_state());
const char * line = state.text();
const char * line_end = line + strlen(line);
std::string commented_line;
if (std::find(line, line_end, '\n') != line_end)
{
/// If query has multiple lines, multiline comment is used over
/// commenting each line separately for easier uncomment (though
/// with invoking editor it is simpler to uncomment multiple lines)
///
/// Note, that using multiline comment is OK even with nested
/// comments, since nested comments are supported.
commented_line = fmt::format("/* {} */", state.text());
}
else
{
// In a simplest case use simple comment.
commented_line = fmt::format("-- {}", state.text());
}
rx.set_state(replxx::Replxx::State(commented_line.c_str(), commented_line.size()));
return rx.invoke(Replxx::ACTION::COMMIT_LINE, code);
};
rx.bind_key(Replxx::KEY::meta('#'), insert_comment_action);
}
ReplxxLineReader::~ReplxxLineReader()

View File

@ -264,6 +264,18 @@ protected:
}
};
/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context.
///
/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way,
/// you need to use class, or you need to use ThreadFromGlobalPool below.
///
/// See the comments of ThreadPool below to know how it works.
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false>;
/// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context.
/// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above.
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
///
/// The template parameter of ThreadFromGlobalPool is set to false to disable tracing context propagation to underlying worker.
@ -274,9 +286,6 @@ protected:
/// which means the tracing context initialized at underlying worker level won't be delete for a very long time.
/// This would cause wrong context for further jobs scheduled in ThreadPool.
///
/// To make sure the tracing context are correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
///
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>;
/// An alias for user code to execute a job in the global thread pool
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolNoTracingContextPropagation>;

View File

@ -30,6 +30,7 @@ struct Settings;
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How much time we will wait until RAFT shutdown", 0) \
M(Milliseconds, session_shutdown_timeout, 10000, "How much time we will wait until sessions are closed during shutdown", 0) \
M(Milliseconds, startup_timeout, 180000, "How much time we will wait until RAFT to start.", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \

View File

@ -354,9 +354,6 @@ void KeeperDispatcher::shutdown()
update_configuration_thread.join();
}
if (server)
server->shutdown();
KeeperStorage::RequestForSession request_for_session;
/// Set session expired for all pending requests
@ -368,10 +365,58 @@ void KeeperDispatcher::shutdown()
setResponse(request_for_session.session_id, response);
}
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);
session_to_response_callback.clear();
KeeperStorage::RequestsForSessions close_requests;
{
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);
if (hasLeader())
{
close_requests.reserve(session_to_response_callback.size());
// send to leader CLOSE requests for active sessions
for (const auto & [session, response] : session_to_response_callback)
{
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
using namespace std::chrono;
KeeperStorage::RequestForSession request_info
{
.session_id = session,
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
.request = std::move(request),
};
close_requests.push_back(std::move(request_info));
}
}
session_to_response_callback.clear();
}
// if there is no leader, there is no reason to do CLOSE because it's a write request
if (hasLeader() && !close_requests.empty())
{
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
const auto raft_result = server->putRequestBatch(close_requests);
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
auto sessions_closing_done = sessions_closing_done_promise->get_future();
raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
nuraft::ptr<std::exception> & /*exception*/) { sessions_closing_done_promise->set_value(); });
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
LOG_WARNING(
log,
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
session_shutdown_timeout);
}
if (server)
server->shutdown();
CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0);
}
catch (...)
{
@ -418,13 +463,15 @@ void KeeperDispatcher::sessionCleanerTask()
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
/// Close session == send close request to raft server
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
KeeperStorage::RequestForSession request_info;
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = dead_session;
KeeperStorage::RequestForSession request_info
{
.session_id = dead_session,
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
.request = std::move(request),
};
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->push(std::move(request_info)))

View File

@ -149,9 +149,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
threads.resize(size_);
for (auto & thread : threads)
thread = ThreadFromGlobalPool([this] { threadFunction(); });
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); });
delayed_thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { delayExecutionThreadFunction(); });
}
@ -168,7 +168,7 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count)
threads.resize(new_threads_count);
for (size_t i = old_threads_count; i < new_threads_count; ++i)
threads[i] = ThreadFromGlobalPool([this] { threadFunction(); });
threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
}

View File

@ -57,7 +57,9 @@ public:
~BackgroundSchedulePool();
private:
using Threads = std::vector<ThreadFromGlobalPool>;
/// BackgroundSchedulePool schedules a task on its own task queue, there's no need to construct/restore tracing context on this level.
/// This is also how ThreadPool class treats the tracing context. See ThreadPool for more information.
using Threads = std::vector<ThreadFromGlobalPoolNoTracingContextPropagation>;
void threadFunction();
void delayExecutionThreadFunction();
@ -83,7 +85,7 @@ private:
std::condition_variable delayed_tasks_cond_var;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
ThreadFromGlobalPool delayed_thread;
ThreadFromGlobalPoolNoTracingContextPropagation delayed_thread;
/// Tasks ordered by scheduled time.
DelayedTasks delayed_tasks;

View File

@ -28,6 +28,11 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
@ -84,10 +89,13 @@ void SortedBlocksWriter::insert(Block && block)
size_t bytes = 0;
size_t flush_no = 0;
if (!block.rows())
return;
{
std::lock_guard lock{insert_mutex};
/// insert bock into BlocksList undef lock
/// insert block into BlocksList under lock
inserted_blocks.insert(std::move(block));
size_t total_row_count = inserted_blocks.row_count + row_count_in_flush;
@ -145,7 +153,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), num_rows)));
if (pipes.empty())
return {};
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty block");
QueryPipelineBuilder pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));

View File

@ -140,6 +140,11 @@ namespace
size_t rows = 0;
size_t bytes = 0;
UInt32 shard_num = 0;
std::string cluster;
std::string distributed_table;
std::string remote_table;
/// dumpStructure() of the header -- obsolete
std::string block_header_string;
Block block_header;
@ -195,6 +200,14 @@ namespace
in.getFileName(), distributed_header.revision, DBMS_TCP_PROTOCOL_VERSION);
}
if (header_buf.hasPendingData())
{
readVarUInt(distributed_header.shard_num, header_buf);
readStringBinary(distributed_header.cluster, header_buf);
readStringBinary(distributed_header.distributed_table, header_buf);
readStringBinary(distributed_header.remote_table, header_buf);
}
/// Add handling new data here, for example:
///
/// if (header_buf.hasPendingData())
@ -621,18 +634,23 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in(file_path);
const auto & distributed_header = readDistributedHeader(in, log);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog());
thread_trace_context->root_span.addAttribute("clickhouse.shard_num", distributed_header.shard_num);
thread_trace_context->root_span.addAttribute("clickhouse.cluster", distributed_header.cluster);
thread_trace_context->root_span.addAttribute("clickhouse.distributed", distributed_header.distributed_table);
thread_trace_context->root_span.addAttribute("clickhouse.remote", distributed_header.remote_table);
thread_trace_context->root_span.addAttribute("clickhouse.rows", distributed_header.rows);
thread_trace_context->root_span.addAttribute("clickhouse.bytes", distributed_header.bytes);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,
connection->getDescription(),
formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog());
RemoteInserter remote{*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,

View File

@ -171,7 +171,6 @@ void DistributedSink::writeAsync(const Block & block)
}
else
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
@ -291,6 +290,8 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("DistrOutStrProc");
@ -331,15 +332,19 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context->getSettingsRef();
/// Do not initiate INSERT for empty block.
size_t rows = shard_block.rows();
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", rows);
span.addAttribute("clickhouse.bytes", [&shard_block]() { return toString(shard_block.bytes()); });
/// Do not initiate INSERT for empty block.
if (rows == 0)
return;
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", rows);
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
if (!job.executor)
@ -610,20 +615,15 @@ void DistributedSink::writeSplitAsync(const Block & block)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
OpenTelemetry::SpanHolder span("DistributedSink::writeAsyncImpl()");
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
Block block_to_send = removeSuperfluousColumns(block);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", block.rows());
if (shard_info.hasInternalReplication())
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
/// Prefer insert into current instance directly
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
else
{
const auto & path = shard_info.insertPathForInternalReplication(
@ -631,13 +631,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
settings.use_compact_format_in_distributed_parts_names);
if (path.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
writeToShard(block_to_send, {path});
writeToShard(shard_info, block_to_send, {path});
}
}
else
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
@ -645,30 +645,44 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
if (!dir_names.empty())
writeToShard(block_to_send, dir_names);
writeToShard(shard_info, block_to_send, dir_names);
}
}
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("db.statement", this->query_string);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", [&block]() { return toString(block.rows()); });
span.addAttribute("clickhouse.bytes", [&block]() { return toString(block.bytes()); });
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
try
{
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
auto block_io = interp.execute();
PushingPipelineExecutor executor(block_io.pipeline);
auto block_io = interp.execute();
PushingPipelineExecutor executor(block_io.pipeline);
executor.start();
writeBlockConvert(executor, block, repeats, log);
executor.finish();
executor.start();
writeBlockConvert(executor, block, repeats, log);
executor.finish();
}
catch (...)
{
span.addAttribute(std::current_exception());
throw;
}
}
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector<std::string> & dir_names)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
@ -759,6 +773,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
header_stream.write(block.cloneEmpty());
}
writeVarUInt(shard_info.shard_num, header_buf);
writeStringBinary(this->storage.cluster_name, header_buf);
writeStringBinary(this->storage.getStorageID().getFullNameNotQuoted(), header_buf);
writeStringBinary(this->storage.remote_database + "." + this->storage.remote_table, header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// And note that it is safe, because we have checksum and size for header.

View File

@ -69,9 +69,9 @@ private:
Block removeSuperfluousColumns(Block block) const;
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, size_t repeats);
void writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
void writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector<std::string> & dir_names);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.

View File

@ -1,4 +1,4 @@
<yandex>
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
@ -19,9 +19,19 @@
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -10,7 +10,15 @@ from kazoo.client import KazooClient
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/keeper_config.xml"], stay_alive=True
"node1", main_configs=["configs/keeper_config1.xml"], stay_alive=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/keeper_config2.xml"], stay_alive=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/keeper_config3.xml"], stay_alive=True
)
bool_struct = struct.Struct("B")
@ -61,7 +69,7 @@ def wait_node(node):
def wait_nodes():
for n in [node1]:
for n in [node1, node2, node3]:
wait_node(n)
@ -165,3 +173,21 @@ def test_session_timeout(started_cluster):
negotiated_timeout, _ = handshake(node1.name, session_timeout=20000, session_id=0)
assert negotiated_timeout == 10000
def test_session_close_shutdown(started_cluster):
wait_nodes()
node1_zk = get_fake_zk(node1.name)
node2_zk = get_fake_zk(node2.name)
eph_node = "/test_node"
node2_zk.create(eph_node, ephemeral=True)
assert node1_zk.exists(eph_node) != None
# shutdown while session is active
node2.stop_clickhouse()
assert node1_zk.exists(eph_node) == None
node2.start_clickhouse()

View File

@ -0,0 +1,21 @@
-- Regression test when Join stores data on disk and receive empty block.
-- Because of this it does not create empty file, while expect it.
SET max_threads = 1;
SET join_algorithm = 'auto';
SET max_rows_in_join = 1000;
SET optimize_aggregation_in_order = 1;
SET max_block_size = 1000;
DROP TABLE IF EXISTS join_on_disk;
SYSTEM STOP MERGES join_on_disk;
CREATE TABLE join_on_disk (id Int) Engine=MergeTree() ORDER BY id;
INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(50000);
INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(1000);
SELECT id FROM join_on_disk lhs LEFT JOIN (SELECT id FROM join_on_disk GROUP BY id) rhs USING (id) FORMAT Null;
DROP TABLE join_on_disk;

View File

@ -0,0 +1,8 @@
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}

View File

@ -0,0 +1,91 @@
#!/usr/bin/env bash
# Tags: no-fasttest, distributed
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# This function takes 4 arguments:
# $1 - OpenTelemetry Trace Id
# $2 - value of insert_distributed_sync
# $3 - value of prefer_localhost_replica
# $4 - a String that helps to debug
function insert()
{
echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=$2, prefer_localhost_replica=$3 VALUES(1),(2)" |
${CLICKHOUSE_CURL} \
-X POST \
-H "traceparent: 00-$1-5150000000000515-01" \
-H "tracestate: $4" \
"${CLICKHOUSE_URL}" \
--data @-
}
function check_span()
{
${CLICKHOUSE_CLIENT} -nq "
SYSTEM FLUSH LOGS;
SELECT operation_name,
attribute['clickhouse.cluster'] AS cluster,
attribute['clickhouse.shard_num'] AS shard,
attribute['clickhouse.rows'] AS rows,
attribute['clickhouse.bytes'] AS bytes
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND lower(hex(trace_id)) = '${1}'
AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry'
AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry'
ORDER BY attribute['clickhouse.shard_num']
Format JSONEachRow
;"
}
#
# Prepare tables for tests
#
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.dist_opentelemetry;
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.local_opentelemetry;
CREATE TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry (key UInt64) Engine=Distributed('test_cluster_two_shards_localhost', ${CLICKHOUSE_DATABASE}, local_opentelemetry, key % 2);
CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=MergeTree ORDER BY key;
"
#
# test1
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 0 1 "async-insert-writeToLocal"
check_span $trace_id
#
# test2
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 0 0 "async-insert-writeToRemote"
check_span $trace_id
#
# test3
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 1 1 "sync-insert-writeToLocal"
check_span $trace_id
#
# test4
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 1 0 "sync-insert-writeToRemote"
check_span $trace_id
#
# Cleanup
#
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry;
DROP TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry;
"