This commit is contained in:
Nikita Mikhaylov 2021-04-08 22:00:39 +03:00
parent 05e04f792e
commit 36a8419f60
20 changed files with 79 additions and 65 deletions

View File

@ -552,10 +552,10 @@ void Connection::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
}
void Connection::sendReadTaskResponse(const std::string & responce)
void Connection::sendReadTaskResponse(const std::string & response)
{
writeVarUInt(Protocol::Client::ReadTaskResponse, *out);
writeStringBinary(responce, *out);
writeStringBinary(response, *out);
out->next();
}
@ -843,9 +843,9 @@ Packet Connection::receivePacket()
String Connection::receiveReadTaskRequest() const
{
String next_task;
readStringBinary(next_task, *in);
return next_task;
String read_task;
readStringBinary(read_task, *in);
return read_task;
}

View File

@ -84,8 +84,9 @@ public:
const ClientInfo & client_info,
bool with_pending_data) override;
void sendReadTaskResponce(const String &) override {
throw Exception("sendReadTaskResponce in not supported with HedgedConnections", ErrorCodes::LOGICAL_ERROR);
void sendReadTaskResponse(const String &) override
{
throw Exception("sendReadTaskResponse in not supported with HedgedConnections", ErrorCodes::LOGICAL_ERROR);
}
Packet receivePacket() override;

View File

@ -24,7 +24,7 @@ public:
const ClientInfo & client_info,
bool with_pending_data) = 0;
virtual void sendReadTaskResponce(const String &) = 0;
virtual void sendReadTaskResponse(const String &) = 0;
/// Get packet from any replica.
virtual Packet receivePacket() = 0;

View File

@ -156,7 +156,7 @@ void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuid
}
void MultiplexedConnections::sendReadTaskResponce(const String & response)
void MultiplexedConnections::sendReadTaskResponse(const String & response)
{
/// No lock_guard because assume it is already called under lock
current_connection->sendReadTaskResponse(response);
@ -217,6 +217,7 @@ Packet MultiplexedConnections::drain()
switch (packet.type)
{
case Protocol::Server::ReadTaskRequest:
case Protocol::Server::PartUUIDs:
case Protocol::Server::Data:
case Protocol::Server::Progress:

View File

@ -39,7 +39,7 @@ public:
const ClientInfo & client_info,
bool with_pending_data) override;
void sendReadTaskResponce(const String &) override;
void sendReadTaskResponse(const String &) override;
Packet receivePacket() override;

View File

@ -383,9 +383,9 @@ bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
void RemoteQueryExecutor::processReadTaskRequest(const String & request)
{
auto query_context = context->getQueryContext();
String responce = query_context->getReadTaskSupervisor()->getNextTaskForId(request);
connections->sendReadTaskResponce(responce);
Context & query_context = context.getQueryContext();
String response = query_context.getTaskSupervisor()->getNextTaskForId(request);
connections->sendReadTaskResponse(response);
}
void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)

View File

@ -88,7 +88,7 @@ public:
/// Set the query_id. For now, used by performance test to later find the query
/// in the server query_log. Must be called before sending the query to the server.
void setQueryId(const std::string& query_id_) { assert(!sent_query); query_id = query_id_; }
void setQueryId(const std::string& query_id_) { assert(!sent_query); std::cout << query_id_ << std::endl; query_id = query_id_; }
/// Specify how we allocate connections on a shard.
void setPoolMode(PoolMode pool_mode_) { pool_mode = pool_mode_; }

View File

@ -139,21 +139,12 @@ String Cluster::Address::toString() const
}
String Cluster::Address::getHash() const
{
SipHash hash;
hash.update(host_name);
hash.update(std::to_string(port));
hash.update(user);
hash.update(password);
return std::to_string(hash.get64());
}
String Cluster::Address::toString(const String & host_name, UInt16 port)
{
return escapeForFileName(host_name) + ':' + DB::toString(port);
}
String Cluster::Address::readableString() const
{
String res;

View File

@ -122,9 +122,6 @@ public:
/// Returns 'escaped_host_name:port'
String toString() const;
/// Returns hash of all fields
String getHash() const;
/// Returns 'host_name:port'
String readableString() const;

View File

@ -2616,7 +2616,7 @@ PartUUIDsPtr Context::getPartUUIDs()
}
TaskSupervisorPtr Context::getReadTaskSupervisor() const
TaskSupervisorPtr Context::getTaskSupervisor() const
{
return read_task_supervisor;
}
@ -2628,7 +2628,7 @@ void Context::setReadTaskSupervisor(TaskSupervisorPtr resolver)
}
NextTaskCallback Context::getNextTaskCallback() const
ReadTaskCallback Context::getReadTaskCallback() const
{
if (!next_task_callback.has_value())
throw Exception(fmt::format("Next task callback is not set for query {}", getInitialQueryId()), ErrorCodes::LOGICAL_ERROR);
@ -2636,7 +2636,7 @@ NextTaskCallback Context::getNextTaskCallback() const
}
void Context::setNextTaskCallback(NextTaskCallback && callback)
void Context::setReadTaskCallback(ReadTaskCallback && callback)
{
next_task_callback = callback;
}

View File

@ -131,7 +131,7 @@ using InputBlocksReader = std::function<Block(ContextPtr)>;
/// Class which gives tasks to other nodes in cluster
class TaskSupervisor;
using TaskSupervisorPtr = std::shared_ptr<TaskSupervisor>;
using NextTaskCallback = std::function<String(String)>;
using ReadTaskCallback = std::function<String(String)>;
/// An empty interface for an arbitrary object that may be attached by a shared pointer
/// to query context, when using ClickHouse as a library.
@ -196,7 +196,7 @@ private:
/// Fields for distributed s3 function
TaskSupervisorPtr read_task_supervisor;
std::optional<NextTaskCallback> next_task_callback;
std::optional<ReadTaskCallback> next_task_callback;
/// Record entities accessed by current query, and store this information in system.query_log.
struct QueryAccessInfo
@ -780,11 +780,11 @@ public:
PartUUIDsPtr getIgnoredPartUUIDs();
/// A bunch of functions for distributed s3 function
TaskSupervisorPtr getReadTaskSupervisor() const;
TaskSupervisorPtr getTaskSupervisor() const;
void setReadTaskSupervisor(TaskSupervisorPtr);
NextTaskCallback getNextTaskCallback() const;
void setNextTaskCallback(NextTaskCallback && callback);
ReadTaskCallback getReadTaskCallback() const;
void setReadTaskCallback(ReadTaskCallback && callback);
private:
std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -288,16 +288,17 @@ void TCPHandler::runImpl()
customizeContext(query_context);
/// Create task supervisor for distributed task processing
query_context->setReadTaskSupervisor(std::make_shared<TaskSupervisor>());
/// This callback is needed for requsting read tasks inside pipeline for distributed processing
query_context->setNextTaskCallback([this](String request) -> String
query_context->setReadTaskCallback([this](String request) mutable -> String
{
std::lock_guard lock(buffer_mutex);
std::lock_guard lock(task_callback_mutex);
sendReadTaskRequestAssumeLocked(request);
return receiveReadTaskResponseAssumeLocked();
});
query_context->setReadTaskSupervisor(std::make_shared<TaskSupervisor>());
bool may_have_embedded_data = client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
/// Processing Query
state.io = executeQuery(state.query, query_context, false, state.stage, may_have_embedded_data);
@ -664,7 +665,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
break;
}
std::lock_guard lock(buffer_mutex);
std::lock_guard lock(task_callback_mutex);
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{

View File

@ -138,8 +138,6 @@ private:
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
std::mutex buffer_mutex;
/// Time after the last check to stop the request and send the progress.
Stopwatch after_check_cancelled;
@ -152,6 +150,7 @@ private:
String cluster;
String cluster_secret;
std::mutex task_callback_mutex;
/// At the moment, only one ongoing query in the connection is supported at a time.
QueryState state;

View File

@ -187,7 +187,6 @@ bool StorageS3Source::initialize()
return false;
}
file_path = bucket + "/" + current_key;
std::cout << "StorageS3Source " << file_path << std::endl;
}
else
{

View File

@ -110,7 +110,7 @@ Pipe StorageS3Distributed::read(
}
auto file_iterator = std::make_shared<DistributedFileIterator>(
context.getNextTaskCallback(),
context.getReadTaskCallback(),
context.getClientInfo().task_identifier);
return Pipe(std::make_shared<StorageS3Source>(
@ -140,7 +140,7 @@ Pipe StorageS3Distributed::read(
std::cout << "Generated UUID : " << task_identifier << std::endl;
/// Register resolver, which will give other nodes a task std::make_unique
context.getReadTaskSupervisor()->registerNextTaskResolver(
context.getTaskSupervisor()->registerNextTaskResolver(
std::make_unique<ReadTaskResolver>(task_identifier, std::move(callback)));
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

View File

@ -79,10 +79,10 @@ private:
struct DistributedFileIterator : public StorageS3Source::FileIterator
{
DistributedFileIterator(NextTaskCallback callback_, String identifier_)
DistributedFileIterator(ReadTaskCallback callback_, String identifier_)
: callback(callback_), identifier(identifier_) {}
NextTaskCallback callback;
ReadTaskCallback callback;
String identifier;
std::optional<String> next() override

View File

@ -48,17 +48,15 @@ public:
target = std::move(resolver);
}
/// Do not erase anything from the map, because TaskSupervisor is stored
/// into context and will be deleted after query ends.
Task getNextTaskForId(const QueryId & id)
{
std::lock_guard lock(mutex);
auto it = dict.find(id);
if (it == dict.end())
return "";
auto answer = it->second->callback();
if (answer.empty())
dict.erase(it);
return answer;
return it->second->callback();
}
private:

View File

@ -101,21 +101,14 @@ ColumnsDescription TableFunctionS3Distributed::getActualTableStructure(const Con
}
StoragePtr TableFunctionS3Distributed::executeImpl(
const ASTPtr & /*filename*/, const Context & context,
const ASTPtr & /*function*/, const Context & context,
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
StoragePtr storage = StorageS3Distributed::create(
filename,
access_key_id,
secret_access_key,
StorageID(getDatabaseName(), table_name),
cluster_name,
format,
context.getSettingsRef().s3_max_connections,
getActualTableStructure(context),
ConstraintsDescription{},
context,
compression_method);
filename, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name),
cluster_name, format, context.getSettingsRef().s3_max_connections,
getActualTableStructure(context), ConstraintsDescription{},
context, compression_method);
storage->startup();

View File

@ -75,9 +75,40 @@ def test_count(started_cluster):
assert TSV(pure_s3) == TSV(s3_distibuted)
def test_union_all(started_cluster):
node = started_cluster.instances['s0_0_0']
pure_s3 = node.query("""
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
ORDER BY (name, value, polygon)
UNION ALL
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
ORDER BY (name, value, polygon)
""")
# print(pure_s3)
s3_distibuted = node.query("""
SELECT * from s3Distributed(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)""")
# print(s3_distibuted)
assert TSV(pure_s3) == TSV(s3_distibuted)
def test_wrong_cluster(started_cluster):
node = started_cluster.instances['s0_0_0']
error = node.query_and_get_error("""
SELECT count(*) from s3Distributed(
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT count(*) from s3Distributed(
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',

View File

@ -8,5 +8,8 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CUR_DIR"/../shell_config.sh
echo $S3_ACCESS_KEY_ID
echo $S3_SECRET_ACCESS
${CLICKHOUSE_CLIENT_BINARY} --send_logs_level="none" -q "SELECT * FROM s3('https://s3.mds.yandex.net/clickhouse-test-reports/*/*/functional_stateless_tests_(ubsan)/test_results.tsv', '$S3_ACCESS_KEY_ID', '$S3_SECRET_ACCESS', 'LineAsString', 'line String') limit 100 FORMAT Null;"
${CLICKHOUSE_CLIENT_BINARY} --send_logs_level="none" -q "SELECT * FROM s3Distributed('test_cluster_two_shards', 'https://s3.mds.yandex.net/clickhouse-test-reports/*/*/functional_stateless_tests_(ubsan)/test_results.tsv', '$S3_ACCESS_KEY_ID', '$S3_SECRET_ACCESS', 'LineAsString', 'line String') limit 100 FORMAT Null;"
${CLICKHOUSE_CLIENT_BINARY} --send_logs_level="none" -q "SELECT * FROM s3Distributed('test_cluster_two_shards', 'https://s3.mds.yandex.net/clickhouse-test-reports/*/*/functional_stateless_tests_(ubsan)/test_results.tsv', '$S3_ACCESS_KEY_ID', '$S3_SECRET_ACCESS', 'LineAsString', 'line String') limit 100 FORMAT Null;"