Merge branch 'master' into io-scheduler

This commit is contained in:
Sergei Trifonov 2022-09-29 13:48:14 +02:00 committed by GitHub
commit 61e12fc54a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 232 additions and 70 deletions

View File

@ -15,7 +15,7 @@ then
# If the system has >=ARMv8.2 (https://en.wikipedia.org/wiki/AArch64), choose the corresponding build, else fall back to a v8.0
# compat build. Unfortunately, the ARM ISA level cannot be read directly, we need to guess from the "features" in /proc/cpuinfo.
# Also, the flags in /proc/cpuinfo are named differently than the flags passed to the compiler (cmake/cpu_features.cmake).
ARMV82=$(grep -m 1 'Features' /proc/cpuinfo | awk '/asimd/ && /sha1/ && /aes/ && /atomics/')
ARMV82=$(grep -m 1 'Features' /proc/cpuinfo | awk '/asimd/ && /sha1/ && /aes/ && /atomics/ && /lrcpc/')
if [ "${ARMV82}" ]
then
DIR="aarch64"

View File

@ -5,7 +5,7 @@ sidebar_position: 103
# anyHeavy
Selects a frequently occurring value using the [heavy hitters](http://www.cs.umd.edu/~samir/498/karp.pdf) algorithm. If there is a value that occurs more than in half the cases in each of the querys execution threads, this value is returned. Normally, the result is nondeterministic.
Selects a frequently occurring value using the [heavy hitters](https://doi.org/10.1145/762471.762473) algorithm. If there is a value that occurs more than in half the cases in each of the querys execution threads, this value is returned. Normally, the result is nondeterministic.
``` sql
anyHeavy(column)

View File

@ -7,7 +7,7 @@ sidebar_position: 108
Returns an array of the approximately most frequent values in the specified column. The resulting array is sorted in descending order of approximate frequency of values (not by the values themselves).
Implements the [Filtered Space-Saving](http://www.l2f.inesc-id.pt/~fmmb/wiki/uploads/Work/misnis.ref0a.pdf) algorithm for analyzing TopK, based on the reduce-and-combine algorithm from [Parallel Space Saving](https://arxiv.org/pdf/1401.0702.pdf).
Implements the [Filtered Space-Saving](https://doi.org/10.1016/j.ins.2010.08.024) algorithm for analyzing TopK, based on the reduce-and-combine algorithm from [Parallel Space Saving](https://doi.org/10.1016/j.ins.2015.09.003).
``` sql
topK(N)(column)

View File

@ -565,6 +565,10 @@ Result:
└────────────────────────────┘
```
## tryBase58Decode(s)
Similar to base58Decode, but returns an empty string in case of error.
## base64Encode(s)
Encodes s string into base64
@ -579,7 +583,7 @@ Alias: `FROM_BASE64`.
## tryBase64Decode(s)
Similar to base64Decode, but in case of error an empty string would be returned.
Similar to base64Decode, but returns an empty string in case of error.
## endsWith(s, suffix)

View File

@ -5,7 +5,7 @@ sidebar_position: 103
# anyHeavy {#anyheavyx}
Выбирает часто встречающееся значение с помощью алгоритма «[heavy hitters](http://www.cs.umd.edu/~samir/498/karp.pdf)». Если существует значение, которое встречается чаще, чем в половине случаев, в каждом потоке выполнения запроса, то возвращается данное значение. В общем случае, результат недетерминирован.
Выбирает часто встречающееся значение с помощью алгоритма «[heavy hitters](https://doi.org/10.1145/762471.762473)». Если существует значение, которое встречается чаще, чем в половине случаев, в каждом потоке выполнения запроса, то возвращается данное значение. В общем случае, результат недетерминирован.
``` sql
anyHeavy(column)

View File

@ -7,7 +7,7 @@ sidebar_position: 108
Возвращает массив наиболее часто встречающихся значений в указанном столбце. Результирующий массив упорядочен по убыванию частоты значения (не по самим значениям).
Реализует [Filtered Space-Saving](http://www.l2f.inesc-id.pt/~fmmb/wiki/uploads/Work/misnis.ref0a.pdf) алгоритм для анализа TopK, на основе reduce-and-combine алгоритма из методики [Parallel Space Saving](https://arxiv.org/pdf/1401.0702.pdf).
Реализует [Filtered Space-Saving](https://doi.org/10.1016/j.ins.2010.08.024) алгоритм для анализа TopK, на основе reduce-and-combine алгоритма из методики [Parallel Space Saving](https://doi.org/10.1016/j.ins.2015.09.003).
``` sql
topK(N)(column)

View File

@ -32,7 +32,7 @@ sidebar_label: FROM
Запросы, которые используют `FINAL` выполняются немного медленее, чем аналогичные запросы без него, потому что:
- Данные мёржатся во время выполнения запроса.
- Данные мёржатся во время выполнения запроса в памяти, и это не приводит к физическому мёржу кусков на дисках.
- Запросы с модификатором `FINAL` читают столбцы первичного ключа в дополнение к столбцам, используемым в запросе.
**В большинстве случаев избегайте использования `FINAL`.** Общий подход заключается в использовании агрегирующих запросов, которые предполагают, что фоновые процессы движков семейства `MergeTree` ещё не случились (например, сами отбрасывают дубликаты). {## TODO: examples ##}

View File

@ -5,7 +5,7 @@ sidebar_position: 103
# anyHeavy {#anyheavyx}
选择一个频繁出现的值,使用[heavy hitters](http://www.cs.umd.edu/~samir/498/karp.pdf) 算法。 如果某个值在查询的每个执行线程中出现的情况超过一半,则返回此值。 通常情况下,结果是不确定的。
选择一个频繁出现的值,使用[heavy hitters](https://doi.org/10.1145/762471.762473) 算法。 如果某个值在查询的每个执行线程中出现的情况超过一半,则返回此值。 通常情况下,结果是不确定的。
``` sql
anyHeavy(column)

View File

@ -7,7 +7,7 @@ sidebar_position: 108
返回指定列中近似最常见值的数组。 生成的数组按值的近似频率降序排序(而不是值本身)。
实现了[过滤节省空间](http://www.l2f.inesc-id.pt/~fmmb/wiki/uploads/Work/misnis.ref0a.pdf)算法, 使用基于reduce-and-combine的算法借鉴[并行节省空间](https://arxiv.org/pdf/1401.0702.pdf)。
实现了[过滤节省空间](https://doi.org/10.1016/j.ins.2010.08.024)算法, 使用基于reduce-and-combine的算法借鉴[并行节省空间](https://doi.org/10.1016/j.ins.2015.09.003)。
**语法**

View File

@ -1,4 +1,4 @@
#include <Common/base58.h>
#include <Common/Base58.h>
namespace DB

View File

@ -1,6 +1,6 @@
#pragma once
#include <Core/Types.h>
#include <base/types.h>
#include <optional>

View File

@ -615,10 +615,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
}
else
{
LOG_TEST(log, "Bypassing cache because writeCache method failed");
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_TEST(log, "Bypassing cache because writeCache (in predownload) method failed");
continue_predownload = false;
}
}

View File

@ -6,7 +6,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/base58.h>
#include <Common/Base58.h>
#include <cstring>
@ -62,9 +62,16 @@ struct Base58Encode
}
};
enum class Base58DecodeErrorHandling
{
ThrowException,
ReturnEmptyString
};
template <typename Name, Base58DecodeErrorHandling ErrorHandling>
struct Base58Decode
{
static constexpr auto name = "base58Decode";
static constexpr auto name = Name::name;
static void process(const ColumnString & src_column, ColumnString::MutablePtr & dst_column, size_t input_rows_count)
{
@ -93,7 +100,12 @@ struct Base58Decode
size_t src_length = current_src_offset - prev_src_offset - 1;
std::optional<size_t> decoded_size = decodeBase58(&src[prev_src_offset], src_length, &dst[current_dst_offset]);
if (!decoded_size)
throw Exception("Invalid Base58 value, cannot be decoded", ErrorCodes::BAD_ARGUMENTS);
{
if constexpr (ErrorHandling == Base58DecodeErrorHandling::ThrowException)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid Base58 value, cannot be decoded");
else
decoded_size = 0;
}
prev_src_offset = current_src_offset;
current_dst_offset += *decoded_size;
@ -113,33 +125,23 @@ class FunctionBase58Conversion : public IFunction
public:
static constexpr auto name = Func::name;
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionBase58Conversion>();
}
String getName() const override
{
return Func::name;
}
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionBase58Conversion>(); }
String getName() const override { return Func::name; }
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1)
throw Exception("Wrong number of arguments for function " + getName() + ": 1 expected.", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong number of arguments for function {}: 1 expected.", getName());
if (!isString(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of first argument of function " + getName() + ". Must be String.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of first argument of function {}. Must be String.",
arguments[0].type->getName(), getName());
return std::make_shared<DataTypeString>();
}
@ -150,8 +152,9 @@ public:
const ColumnString * input = checkAndGetColumn<ColumnString>(column_string.get());
if (!input)
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of first argument of function " + getName() + ", must be String",
ErrorCodes::ILLEGAL_COLUMN);
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}, must be String",
arguments[0].column->getName(), getName());
auto dst_column = ColumnString::create();

View File

@ -19,6 +19,7 @@ using namespace GatherUtils;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int INCORRECT_DATA;
@ -59,34 +60,22 @@ class FunctionBase64Conversion : public IFunction
public:
static constexpr auto name = Func::name;
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionBase64Conversion>();
}
String getName() const override
{
return Func::name;
}
size_t getNumberOfArguments() const override
{
return 1;
}
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionBase64Conversion>(); }
String getName() const override { return Func::name; }
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override
{
return true;
}
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong number of arguments for function {}: 1 expected.", getName());
if (!WhichDataType(arguments[0].type).isString())
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of 1st argument of function " + getName() + ". Must be String.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of 1st argument of function {}. Must be String.",
arguments[0].type->getName(), getName());
return std::make_shared<DataTypeString>();
}
@ -98,8 +87,9 @@ public:
if (!input)
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of first argument of function " + getName() + ", must be of type String",
ErrorCodes::ILLEGAL_COLUMN);
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}, must be of type String",
arguments[0].column->getName(), getName());
auto dst_column = ColumnString::create();
auto & dst_data = dst_column->getChars();
@ -145,7 +135,10 @@ public:
#endif
if (!outlen)
throw Exception("Failed to " + getName() + " input '" + String(reinterpret_cast<const char *>(source), srclen) + "'", ErrorCodes::INCORRECT_DATA);
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Failed to {} input '{}'",
getName(), String(reinterpret_cast<const char *>(source), srclen));
}
}
else

View File

@ -0,0 +1,24 @@
#include <Functions/FunctionBase58Conversion.h>
#include <Functions/FunctionFactory.h>
namespace DB
{
namespace
{
struct NameBase58Decode
{
static constexpr auto name = "base58Decode";
};
using Base58DecodeImpl = Base58Decode<NameBase58Decode, Base58DecodeErrorHandling::ThrowException>;
using FunctionBase58Decode = FunctionBase58Conversion<Base58DecodeImpl>;
}
REGISTER_FUNCTION(Base58Decode)
{
factory.registerFunction<FunctionBase58Decode>();
}
}

View File

@ -7,9 +7,4 @@ REGISTER_FUNCTION(Base58Encode)
{
factory.registerFunction<FunctionBase58Conversion<Base58Encode>>();
}
REGISTER_FUNCTION(Base58Decode)
{
factory.registerFunction<FunctionBase58Conversion<Base58Decode>>();
}
}

View File

@ -0,0 +1,24 @@
#include <Functions/FunctionBase58Conversion.h>
#include <Functions/FunctionFactory.h>
namespace DB
{
namespace
{
struct NameTryBase58Decode
{
static constexpr auto name = "tryBase58Decode";
};
using TryBase58DecodeImpl = Base58Decode<NameTryBase58Decode, Base58DecodeErrorHandling::ReturnEmptyString>;
using FunctionTryBase58Decode = FunctionBase58Conversion<TryBase58DecodeImpl>;
}
REGISTER_FUNCTION(TryBase58Decode)
{
factory.registerFunction<FunctionTryBase58Decode>();
}
}

View File

@ -7,7 +7,8 @@
:main jepsen.clickhouse-keeper.main
:plugins [[lein-cljfmt "0.7.0"]]
:dependencies [[org.clojure/clojure "1.10.1"]
[jepsen "0.2.6"]
[jepsen "0.2.7"]
[zookeeper-clj "0.9.4"]
[com.hierynomus/sshj "0.34.0"]
[org.apache.zookeeper/zookeeper "3.6.1" :exclusions [org.slf4j/slf4j-log4j12]]]
:repl-options {:init-ns jepsen.clickhouse-keeper.main})

View File

@ -1,3 +1,98 @@
(ns jepsen.control.sshj
(:require [jepsen.control [core :as core]
[sshj :as sshj]]
[slingshot.slingshot :refer [try+ throw+]])
(:import (net.schmizz.sshj SSHClient
DefaultConfig)
(net.schmizz.sshj.transport.verification PromiscuousVerifier)
(java.util.concurrent Semaphore)))
(defrecord SSHJRemote [concurrency-limit
conn-spec
^SSHClient client
^Semaphore semaphore]
core/Remote
(connect [this conn-spec]
(if (:dummy conn-spec)
(assoc this :conn-spec conn-spec)
(try+ (let [c (as-> (SSHClient.) client
(do
(if (:strict-host-key-checking conn-spec)
(.loadKnownHosts client)
(.addHostKeyVerifier client (PromiscuousVerifier.)))
(.connect client (:host conn-spec) (:port conn-spec))
(auth! client conn-spec)
client))]
(assoc this
:conn-spec conn-spec
:client c
:semaphore (Semaphore. concurrency-limit true)))
(catch Exception e
; SSHJ wraps InterruptedException in its own exceptions, so we
; have to see through that and rethrow properly.
(let [cause (util/ex-root-cause e)]
(when (instance? InterruptedException cause)
(throw cause)))
(throw+ (assoc conn-spec
:type :jepsen.control/session-error
:message "Error opening SSH session. Verify username, password, and node hostnames are correct."))))))
(disconnect! [this]
(when-let [c client]
(.close c)))
(execute! [this ctx action]
; (info :permits (.availablePermits semaphore))
(when (:dummy conn-spec)
(throw+ {:type :jepsen.control/dummy}))
(.acquire semaphore)
(sshj/with-errors conn-spec ctx
(try
(with-open [session (.startSession client)]
(let [cmd (.exec session (:cmd action))
; Feed it input
_ (when-let [input (:in action)]
(let [stream (.getOutputStream cmd)]
(bs/transfer input stream)
(send-eof! client session)
(.close stream)))
; Read output
out (.toString (IOUtils/readFully (.getInputStream cmd)))
err (.toString (IOUtils/readFully (.getErrorStream cmd)))
; Wait on command
_ (.join cmd)]
; Return completion
(assoc action
:out out
:err err
; There's also a .getExitErrorMessage that might be
; interesting here?
:exit (.getExitStatus cmd))))
(finally
(.release semaphore)))))
(upload! [this ctx local-paths remote-path _opts]
(when (:dummy conn-spec)
(throw+ {:type :jepsen.control/dummy}))
(with-errors conn-spec ctx
(with-open [sftp (.newSFTPClient client)]
(.put sftp (FileSystemFile. local-paths) remote-path))))
(download! [this ctx remote-paths local-path _opts]
(when (:dummy conn-spec)
(throw+ {:type :jepsen.control/dummy}))
(with-errors conn-spec ctx
(with-open [sftp (.newSFTPClient client)]
(.get sftp remote-paths (FileSystemFile. local-path))))))
(defn remote
"Constructs an SSHJ remote."
[]
(-> (SSHJRemote. concurrency-limit nil nil nil)
; We *can* use our own SCP, but shelling out is faster.
scp/remote
retry/remote))
(ns jepsen.clickhouse-keeper.main
(:require [clojure.tools.logging :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
@ -17,7 +112,6 @@
[checker :as checker]
[cli :as cli]
[client :as client]
[control :as c]
[db :as db]
[nemesis :as nemesis]
[generator :as gen]

View File

@ -1,8 +1,16 @@
-- Tags: no-fasttest
SET send_logs_level = 'fatal';
SELECT base64Encode(val) FROM (select arrayJoin(['', 'f', 'fo', 'foo', 'foob', 'fooba', 'foobar']) val);
SELECT base64Decode(val) FROM (select arrayJoin(['', 'Zg==', 'Zm8=', 'Zm9v', 'Zm9vYg==', 'Zm9vYmE=', 'Zm9vYmFy']) val);
SELECT base64Decode(base64Encode('foo')) = 'foo', base64Encode(base64Decode('Zm9v')) == 'Zm9v';
SELECT tryBase64Decode('Zm9vYmF=Zm9v');
SELECT base64Decode('Zm9vYmF=Zm9v'); -- { serverError 117 }
SELECT base64Encode(val, 'excess argument') FROM (select arrayJoin(['', 'f', 'fo', 'foo', 'foob', 'fooba', 'foobar']) val); -- { serverError 42 }
SELECT base64Decode(val, 'excess argument') FROM (select arrayJoin(['', 'Zg==', 'Zm8=', 'Zm9v', 'Zm9vYg==', 'Zm9vYmE=', 'Zm9vYmFy']) val); -- { serverError 42 }
SELECT tryBase64Decode('Zm9vYmF=Zm9v', 'excess argument'); -- { serverError 42 }
SELECT base64Decode('Zm9vYmF=Zm9v'); -- { serverError 117 }

View File

@ -8,6 +8,22 @@ fooba
foobar
Hello world!
f
fo
foo
foob
fooba
foobar
Hello world!
foob
foobar
2m
8o8
bQbp

View File

@ -5,6 +5,8 @@ SELECT base58Encode('Hold my beer...', 'Second arg'); -- { serverError 42 }
SELECT base58Decode('Hold my beer...'); -- { serverError 36 }
SELECT base58Decode(encoded) FROM (SELECT base58Encode(val) as encoded FROM (select arrayJoin(['', 'f', 'fo', 'foo', 'foob', 'fooba', 'foobar', 'Hello world!']) val));
SELECT tryBase58Decode(encoded) FROM (SELECT base58Encode(val) as encoded FROM (select arrayJoin(['', 'f', 'fo', 'foo', 'foob', 'fooba', 'foobar', 'Hello world!']) val));
SELECT tryBase58Decode(val) FROM (SELECT arrayJoin(['Hold my beer', 'Hold another beer', '3csAg9', 'And a wine', 'And another wine', 'And a lemonade', 't1Zv2yaZ', 'And another wine']) val);
SELECT base58Encode(val) FROM (select arrayJoin(['', 'f', 'fo', 'foo', 'foob', 'fooba', 'foobar']) val);
SELECT base58Decode(val) FROM (select arrayJoin(['', '2m', '8o8', 'bQbp', '3csAg9', 'CZJRhmz', 't1Zv2yaZ', '']) val);

View File

@ -908,6 +908,7 @@ trimBoth
trimLeft
trimRight
trunc
tryBase58Decode
tumble
tumbleEnd
tumbleStart

View File

@ -15,5 +15,5 @@ AND name NOT IN (
'h3ToGeoBoundary', 'h3ToParent', 'h3ToString', 'h3UnidirectionalEdgeIsValid', 'h3kRing', 'stringToH3',
'geoToS2', 's2CapContains', 's2CapUnion', 's2CellsIntersect', 's2GetNeighbors', 's2RectAdd', 's2RectContains', 's2RectIntersection', 's2RectUnion', 's2ToGeo',
'normalizeUTF8NFC', 'normalizeUTF8NFD', 'normalizeUTF8NFKC', 'normalizeUTF8NFKD',
'lemmatize', 'tokenize', 'stem', 'synonyms'
'lemmatize', 'tokenize', 'stem', 'synonyms' -- these functions are not enabled in fast test
) ORDER BY name;