Merge branch 'master' into no-finalize-WriteBufferFromOStream

This commit is contained in:
Sema Checherinda 2023-06-28 17:04:25 +02:00 committed by GitHub
commit ecea5da70c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 163 additions and 61 deletions

View File

@ -89,7 +89,7 @@ RUN arch=${TARGETARCH:-amd64} \
&& dpkg -i /tmp/nfpm.deb \
&& rm /tmp/nfpm.deb
ARG GO_VERSION=1.19.5
ARG GO_VERSION=1.19.10
# We need go for clickhouse-diagnostics
RUN arch=${TARGETARCH:-amd64} \
&& curl -Lo /tmp/go.tgz "https://go.dev/dl/go${GO_VERSION}.linux-${arch}.tar.gz" \

View File

@ -1,4 +1,4 @@
FROM ubuntu:22.04
FROM ubuntu:20.04
# see https://github.com/moby/moby/issues/4032#issuecomment-192327844
ARG DEBIAN_FRONTEND=noninteractive
@ -11,17 +11,18 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
&& apt-get update \
&& apt-get upgrade -yq \
&& apt-get install --yes --no-install-recommends \
apt-transport-https \
ca-certificates \
dirmngr \
gnupg2 \
wget \
locales \
tzdata \
&& apt-get clean
wget \
&& apt-get clean \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/*
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.5.3.24"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
@ -43,7 +44,8 @@ ARG single_binary_location_url=""
ARG TARGETARCH
RUN arch=${TARGETARCH:-amd64} \
# install from a web location with deb packages
RUN arch="${TARGETARCH:-amd64}" \
&& if [ -n "${deb_location_url}" ]; then \
echo "installing from custom url with deb packages: ${deb_location_url}" \
rm -rf /tmp/clickhouse_debs \
@ -54,38 +56,54 @@ RUN arch=${TARGETARCH:-amd64} \
|| exit 1 \
; done \
&& dpkg -i /tmp/clickhouse_debs/*.deb ; \
elif [ -n "${single_binary_location_url}" ]; then \
fi
# install from a single binary
RUN if [ -n "${single_binary_location_url}" ]; then \
echo "installing from single binary url: ${single_binary_location_url}" \
&& rm -rf /tmp/clickhouse_binary \
&& mkdir -p /tmp/clickhouse_binary \
&& wget --progress=bar:force:noscroll "${single_binary_location_url}" -O /tmp/clickhouse_binary/clickhouse \
&& chmod +x /tmp/clickhouse_binary/clickhouse \
&& /tmp/clickhouse_binary/clickhouse install --user "clickhouse" --group "clickhouse" ; \
else \
mkdir -p /etc/apt/sources.list.d \
&& apt-key adv --keyserver keyserver.ubuntu.com --recv 8919F6BD2B48D754 \
&& echo ${REPOSITORY} > /etc/apt/sources.list.d/clickhouse.list \
fi
# A fallback to installation from ClickHouse repository
RUN if ! clickhouse local -q "SELECT ''" > /dev/null; then \
apt-get update \
&& apt-get install --yes --no-install-recommends \
apt-transport-https \
ca-certificates \
dirmngr \
gnupg2 \
&& mkdir -p /etc/apt/sources.list.d \
&& GNUPGHOME=$(mktemp -d) \
&& GNUPGHOME="$GNUPGHOME" gpg --no-default-keyring \
--keyring /usr/share/keyrings/clickhouse-keyring.gpg \
--keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 8919F6BD2B48D754 \
&& rm -r "$GNUPGHOME" \
&& chmod +r /usr/share/keyrings/clickhouse-keyring.gpg \
&& echo "${REPOSITORY}" > /etc/apt/sources.list.d/clickhouse.list \
&& echo "installing from repository: ${REPOSITORY}" \
&& apt-get update \
&& apt-get --yes -o "Dpkg::Options::=--force-confdef" -o "Dpkg::Options::=--force-confold" upgrade \
&& for package in ${PACKAGES}; do \
packages="${packages} ${package}=${VERSION}" \
; done \
&& apt-get install --allow-unauthenticated --yes --no-install-recommends ${packages} || exit 1 \
; fi \
&& clickhouse-local -q 'SELECT * FROM system.build_options' \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/* \
&& mkdir -p /var/lib/clickhouse /var/log/clickhouse-server /etc/clickhouse-server /etc/clickhouse-client \
&& chmod ugo+Xrw -R /var/lib/clickhouse /var/log/clickhouse-server /etc/clickhouse-server /etc/clickhouse-client
RUN apt-get autoremove --purge -yq libksba8 && \
apt-get autoremove -yq
&& apt-get autoremove --purge -yq libksba8 \
&& apt-get autoremove -yq \
; fi
# post install
# we need to allow "others" access to clickhouse folder, because docker container
# can be started with arbitrary uid (openshift usecase)
RUN clickhouse-local -q 'SELECT * FROM system.build_options' \
&& mkdir -p /var/lib/clickhouse /var/log/clickhouse-server /etc/clickhouse-server /etc/clickhouse-client \
&& chmod ugo+Xrw -R /var/lib/clickhouse /var/log/clickhouse-server /etc/clickhouse-server /etc/clickhouse-client
RUN locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8

View File

@ -20,7 +20,6 @@ For more information and documentation see https://clickhouse.com/.
- The amd64 image requires support for [SSE3 instructions](https://en.wikipedia.org/wiki/SSE3). Virtually all x86 CPUs after 2005 support SSE3.
- The arm64 image requires support for the [ARMv8.2-A architecture](https://en.wikipedia.org/wiki/AArch64#ARMv8.2-A). Most ARM CPUs after 2017 support ARMv8.2-A. A notable exception is Raspberry Pi 4 from 2019 whose CPU only supports ARMv8.0-A.
- Since the Clickhouse 23.3 Ubuntu image started using `ubuntu:22.04` as its base image, it requires docker version >= `20.10.10`, or use `docker run -- privileged` instead. Alternatively, try the Clickhouse Alpine image.
## How to use this image

View File

@ -2454,18 +2454,22 @@ In this format, all input data is read to a single value. It is possible to pars
The result is output in binary format without delimiters and escaping. If more than one value is output, the format is ambiguous, and it will be impossible to read the data back.
Below is a comparison of the formats `RawBLOB` and [TabSeparatedRaw](#tabseparatedraw).
`RawBLOB`:
- data is output in binary format, no escaping;
- there are no delimiters between values;
- no newline at the end of each value.
[TabSeparatedRaw] (#tabseparatedraw):
`TabSeparatedRaw`:
- data is output without escaping;
- the rows contain values separated by tabs;
- there is a line feed after the last value in every row.
The following is a comparison of the `RawBLOB` and [RowBinary](#rowbinary) formats.
`RawBLOB`:
- String fields are output without being prefixed by length.
`RowBinary`:
- String fields are represented as length in varint format (unsigned [LEB128] (https://en.wikipedia.org/wiki/LEB128)), followed by the bytes of the string.

View File

@ -1175,16 +1175,12 @@ ProfileInfo Connection::receiveProfileInfo() const
ParallelReadRequest Connection::receiveParallelReadRequest() const
{
ParallelReadRequest request;
request.deserialize(*in);
return request;
return ParallelReadRequest::deserialize(*in);
}
InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnounecement() const
{
InitialAllRangesAnnouncement announcement;
announcement.deserialize(*in);
return announcement;
return InitialAllRangesAnnouncement::deserialize(*in);
}

View File

@ -16,6 +16,10 @@
#include <boost/noncopyable.hpp>
#include <optional>
#include <vector>
#include <memory>
#include <string>
namespace DB
{
@ -34,9 +38,9 @@ struct Packet
ProfileInfo profile_info;
std::vector<UUID> part_uuids;
InitialAllRangesAnnouncement announcement;
ParallelReadRequest request;
ParallelReadResponse response;
/// The part of parallel replicas protocol
std::optional<InitialAllRangesAnnouncement> announcement;
std::optional<ParallelReadRequest> request;
std::string server_timezone;

View File

@ -9,6 +9,7 @@
#include <Common/logger_useful.h>
#include "libaccel_config.h"
#include <Common/MemorySanitizer.h>
#include <base/scope_guard.h>
namespace DB
{
@ -34,6 +35,7 @@ DeflateQplJobHWPool::DeflateQplJobHWPool()
// loop all configured workqueue size to get maximum job number.
accfg_ctx * ctx_ptr = nullptr;
auto ctx_status = accfg_new(&ctx_ptr);
SCOPE_EXIT({ accfg_unref(ctx_ptr); });
if (ctx_status == 0)
{
auto * dev_ptr = accfg_device_get_first(ctx_ptr);

View File

@ -216,8 +216,24 @@ void DatabaseCatalog::shutdownImpl()
/// We still hold "databases" (instead of std::move) for Buffer tables to flush data correctly.
/// Delay shutdown of temporary and system databases. They will be shutdown last.
/// Because some databases might use them until their shutdown is called, but calling shutdown
/// on temporary database means clearing its set of tables, which will lead to unnecessary errors like "table not found".
std::vector<DatabasePtr> databases_with_delayed_shutdown;
for (auto & database : current_databases)
{
if (database.first == TEMPORARY_DATABASE || database.first == SYSTEM_DATABASE)
{
databases_with_delayed_shutdown.push_back(database.second);
continue;
}
database.second->shutdown();
}
for (auto & database : databases_with_delayed_shutdown)
{
database->shutdown();
}
{
std::lock_guard lock(tables_marked_dropped_mutex);

View File

@ -434,11 +434,13 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
switch (packet.type)
{
case Protocol::Server::MergeTreeReadTaskRequest:
processMergeTreeReadTaskRequest(packet.request);
chassert(packet.request.has_value());
processMergeTreeReadTaskRequest(packet.request.value());
return ReadResult(ReadResult::Type::ParallelReplicasToken);
case Protocol::Server::MergeTreeAllRangesAnnounecement:
processMergeTreeInitialReadAnnounecement(packet.announcement);
chassert(packet.announcement.has_value());
processMergeTreeInitialReadAnnounecement(packet.announcement.value());
return ReadResult(ReadResult::Type::ParallelReplicasToken);
case Protocol::Server::ReadTaskRequest:

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <Storages/MergeTree/RequestResponse.h>
namespace ProfileEvents
@ -433,8 +434,12 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t thread)
if (buffered_ranges.empty())
{
auto result = extension.callback(ParallelReadRequest{
.replica_num = extension.number_of_current_replica, .min_number_of_marks = min_marks_for_concurrent_read * threads});
auto result = extension.callback(ParallelReadRequest(
CoordinationMode::Default,
extension.number_of_current_replica,
min_marks_for_concurrent_read * threads,
/// For Default coordination mode we don't need to pass part names.
RangesInDataPartsDescription{}));
if (!result || result->finish)
{
@ -529,12 +534,12 @@ MarkRanges MergeTreeInOrderReadPoolParallelReplicas::getNewTask(RangesInDataPart
if (no_more_tasks)
return {};
auto response = extension.callback(ParallelReadRequest{
.mode = mode,
.replica_num = extension.number_of_current_replica,
.min_number_of_marks = min_marks_for_concurrent_read * request.size(),
.description = request,
});
auto response = extension.callback(ParallelReadRequest(
mode,
extension.number_of_current_replica,
min_marks_for_concurrent_read * request.size(),
request
));
if (!response || response->description.empty() || response->finish)
{

View File

@ -193,10 +193,11 @@ public:
predict_block_size_bytes, column_names, virtual_column_names, prewhere_info,
actions_settings, reader_settings, per_part_params);
extension.all_callback({
.description = parts_ranges.getDescriptions(),
.replica_num = extension.number_of_current_replica
});
extension.all_callback(InitialAllRangesAnnouncement(
CoordinationMode::Default,
parts_ranges.getDescriptions(),
extension.number_of_current_replica
));
}
~MergeTreeReadPoolParallelReplicas() override;
@ -253,10 +254,11 @@ public:
for (const auto & part : parts_ranges)
buffered_tasks.push_back({part.data_part->info, MarkRanges{}});
extension.all_callback({
.description = parts_ranges.getDescriptions(),
.replica_num = extension.number_of_current_replica
});
extension.all_callback(InitialAllRangesAnnouncement(
mode,
parts_ranges.getDescriptions(),
extension.number_of_current_replica
));
}
MarkRanges getNewTask(RangesInDataPartDescription description);

View File

@ -102,7 +102,6 @@ public:
explicit DefaultCoordinator(size_t replicas_count_)
: ParallelReplicasReadingCoordinator::ImplInterface(replicas_count_)
, announcements(replicas_count_)
, reading_state(replicas_count_)
{
}
@ -119,7 +118,6 @@ public:
PartitionToBlockRanges partitions;
size_t sent_initial_requests{0};
std::vector<InitialAllRangesAnnouncement> announcements;
Parts all_parts_to_read;
/// Contains only parts which we haven't started to read from

View File

@ -51,7 +51,7 @@ String ParallelReadRequest::describe() const
return result;
}
void ParallelReadRequest::deserialize(ReadBuffer & in)
ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in)
{
UInt64 version;
readIntBinary(version, in);
@ -60,12 +60,24 @@ void ParallelReadRequest::deserialize(ReadBuffer & in)
"from replicas differ. Got: {}, supported version: {}",
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION);
CoordinationMode mode;
size_t replica_num;
size_t min_number_of_marks;
RangesInDataPartsDescription description;
uint8_t mode_candidate;
readIntBinary(mode_candidate, in);
mode = validateAndGet(mode_candidate);
readIntBinary(replica_num, in);
readIntBinary(min_number_of_marks, in);
description.deserialize(in);
return ParallelReadRequest(
mode,
replica_num,
min_number_of_marks,
std::move(description)
);
}
void ParallelReadRequest::merge(ParallelReadRequest & other)
@ -125,7 +137,7 @@ String InitialAllRangesAnnouncement::describe()
return result;
}
void InitialAllRangesAnnouncement::deserialize(ReadBuffer & in)
InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in)
{
UInt64 version;
readIntBinary(version, in);
@ -134,11 +146,21 @@ void InitialAllRangesAnnouncement::deserialize(ReadBuffer & in)
"from replicas differ. Got: {}, supported version: {}",
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION);
CoordinationMode mode;
RangesInDataPartsDescription description;
size_t replica_num;
uint8_t mode_candidate;
readIntBinary(mode_candidate, in);
mode = validateAndGet(mode_candidate);
description.deserialize(in);
readIntBinary(replica_num, in);
return InitialAllRangesAnnouncement {
mode,
description,
replica_num
};
}
}

View File

@ -40,21 +40,40 @@ struct PartBlockRange
}
};
/// ParallelReadRequest is used by remote replicas during parallel read
/// to signal an initiator that they need more marks to read.
struct ParallelReadRequest
{
/// No default constructor, you must initialize all fields at once.
ParallelReadRequest(
CoordinationMode mode_,
size_t replica_num_,
size_t min_number_of_marks_,
RangesInDataPartsDescription description_)
: mode(mode_)
, replica_num(replica_num_)
, min_number_of_marks(min_number_of_marks_)
, description(std::move(description_))
{}
CoordinationMode mode;
size_t replica_num;
size_t min_number_of_marks;
/// Extension for ordered mode
/// Extension for Ordered (InOrder or ReverseOrder) mode
/// Contains only data part names without mark ranges.
RangesInDataPartsDescription description;
void serialize(WriteBuffer & out) const;
String describe() const;
void deserialize(ReadBuffer & in);
static ParallelReadRequest deserialize(ReadBuffer & in);
void merge(ParallelReadRequest & other);
};
/// ParallelReadResponse is used by an initiator to tell
/// remote replicas about what to read during parallel reading.
/// Additionally contains information whether there are more available
/// marks to read (whether it is the last packet or not).
struct ParallelReadResponse
{
bool finish{false};
@ -66,15 +85,30 @@ struct ParallelReadResponse
};
/// The set of parts (their names) along with ranges to read which is sent back
/// to the initiator by remote replicas during parallel reading.
/// Additionally contains an identifier (replica_num) plus
/// the reading algorithm chosen (Default, InOrder or ReverseOrder).
struct InitialAllRangesAnnouncement
{
/// No default constructor, you must initialize all fields at once.
InitialAllRangesAnnouncement(
CoordinationMode mode_,
RangesInDataPartsDescription description_,
size_t replica_num_)
: mode(mode_)
, description(description_)
, replica_num(replica_num_)
{}
CoordinationMode mode;
RangesInDataPartsDescription description;
size_t replica_num;
void serialize(WriteBuffer & out) const;
String describe();
void deserialize(ReadBuffer & in);
static InitialAllRangesAnnouncement deserialize(ReadBuffer & in);
};

View File

@ -40,7 +40,7 @@
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<heart_beat_interval_ms>1000</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>4000</election_timeout_lower_bound_ms>
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>5000</election_timeout_upper_bound_ms>
<raft_logs_level>information</raft_logs_level>
<force_sync>false</force_sync>

View File

@ -846,7 +846,7 @@ def test_start_stop_moves(start_cluster, name, engine):
node1.query("SYSTEM START MOVES {}".format(name))
# wait sometime until background backoff finishes
retry = 30
retry = 60
i = 0
while not sum(1 for x in used_disks if x == "jbod1") <= 2 and i < retry:
time.sleep(1)