Merge remote-tracking branch 'ClickHouse/master' into order_by_bug_fix

This commit is contained in:
Robert Schulze 2024-02-01 11:31:03 +00:00
commit b9ab377c7b
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
20 changed files with 157 additions and 65 deletions

View File

@ -2,6 +2,7 @@ use prql_compiler::sql::Dialect;
use prql_compiler::{Options, Target};
use std::ffi::{c_char, CString};
use std::slice;
use std::panic;
fn set_output(result: String, out: *mut *mut u8, out_size: *mut u64) {
assert!(!out_size.is_null());
@ -13,8 +14,7 @@ fn set_output(result: String, out: *mut *mut u8, out_size: *mut u64) {
*out_ptr = CString::new(result).unwrap().into_raw() as *mut u8;
}
#[no_mangle]
pub unsafe extern "C" fn prql_to_sql(
pub unsafe extern "C" fn prql_to_sql_impl(
query: *const u8,
size: u64,
out: *mut *mut u8,
@ -50,6 +50,23 @@ pub unsafe extern "C" fn prql_to_sql(
}
}
#[no_mangle]
pub unsafe extern "C" fn prql_to_sql(
query: *const u8,
size: u64,
out: *mut *mut u8,
out_size: *mut u64,
) -> i64 {
let ret = panic::catch_unwind(|| {
return prql_to_sql_impl(query, size, out, out_size);
});
return match ret {
// NOTE: using cxxbridge we can return proper Result<> type.
Err(_err) => 1,
Ok(res) => res,
}
}
#[no_mangle]
pub unsafe extern "C" fn prql_free_pointer(ptr_to_free: *mut u8) {
std::mem::drop(CString::from_raw(ptr_to_free as *mut c_char));

View File

@ -1,6 +1,7 @@
use skim::prelude::*;
use term::terminfo::TermInfo;
use cxx::{CxxString, CxxVector};
use std::panic;
#[cxx::bridge]
mod ffi {
@ -36,7 +37,7 @@ impl SkimItem for Item {
}
}
fn skim(prefix: &CxxString, words: &CxxVector<CxxString>) -> Result<String, String> {
fn skim_impl(prefix: &CxxString, words: &CxxVector<CxxString>) -> Result<String, String> {
// Let's check is terminal available. To avoid panic.
if let Err(err) = TermInfo::from_env() {
return Err(format!("{}", err));
@ -89,3 +90,22 @@ fn skim(prefix: &CxxString, words: &CxxVector<CxxString>) -> Result<String, Stri
}
return Ok(output.selected_items[0].output().to_string());
}
fn skim(prefix: &CxxString, words: &CxxVector<CxxString>) -> Result<String, String> {
let ret = panic::catch_unwind(|| {
return skim_impl(prefix, words);
});
return match ret {
Err(err) => {
let e = if let Some(s) = err.downcast_ref::<String>() {
format!("{}", s)
} else if let Some(s) = err.downcast_ref::<&str>() {
format!("{}", s)
} else {
format!("Unknown panic type: {:?}", err.type_id())
};
Err(format!("Rust panic: {:?}", e))
},
Ok(res) => res,
}
}

View File

@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
@ -30,12 +31,12 @@ class ApproxSampler
public:
struct Stats
{
T value; // the sampled value
Int64 g; // the minimum rank jump from the previous value's minimum rank
Int64 delta; // the maximum span of the rank
T value; // The sampled value
Int64 g; // The minimum rank jump from the previous value's minimum rank
Int64 delta; // The maximum span of the rank
Stats() = default;
Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) {}
Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) { }
};
struct QueryResult
@ -49,20 +50,20 @@ public:
ApproxSampler() = default;
explicit ApproxSampler(
double relative_error_,
size_t compress_threshold_ = default_compress_threshold,
size_t count_ = 0,
bool compressed_ = false)
: relative_error(relative_error_)
, compress_threshold(compress_threshold_)
, count(count_)
, compressed(compressed_)
ApproxSampler(const ApproxSampler & other)
: relative_error(other.relative_error)
, compress_threshold(other.compress_threshold)
, count(other.count)
, compressed(other.compressed)
, sampled(other.sampled.begin(), other.sampled.end())
, backup_sampled(other.backup_sampled.begin(), other.backup_sampled.end())
, head_sampled(other.head_sampled.begin(), other.head_sampled.end())
{
sampled.reserve(compress_threshold);
backup_sampled.reserve(compress_threshold);
}
head_sampled.reserve(default_head_size);
explicit ApproxSampler(double relative_error_)
: relative_error(relative_error_), compress_threshold(default_compress_threshold), count(0), compressed(false)
{
}
bool isCompressed() const { return compressed; }
@ -95,9 +96,9 @@ public:
Int64 current_max = std::numeric_limits<Int64>::min();
for (const auto & stats : sampled)
current_max = std::max(stats.delta + stats.g, current_max);
Int64 target_error = current_max/2;
Int64 target_error = current_max / 2;
size_t index= 0;
size_t index = 0;
auto min_rank = sampled[0].g;
for (size_t i = 0; i < size; ++i)
{
@ -118,7 +119,6 @@ public:
result[indices[i]] = res.value;
}
}
}
void compress()
@ -256,16 +256,27 @@ public:
void read(ReadBuffer & buf)
{
readBinaryLittleEndian(compress_threshold, buf);
if (compress_threshold != default_compress_threshold)
throw Exception(
ErrorCodes::INCORRECT_DATA,
"The compress threshold {} isn't the expected one {}",
compress_threshold,
default_compress_threshold);
readBinaryLittleEndian(relative_error, buf);
readBinaryLittleEndian(count, buf);
size_t sampled_len = 0;
readBinaryLittleEndian(sampled_len, buf);
if (sampled_len > compress_threshold)
throw Exception(
ErrorCodes::INCORRECT_DATA, "The number of elements {} for quantileGK exceeds {}", sampled_len, compress_threshold);
sampled.resize(sampled_len);
for (size_t i = 0; i < sampled_len; ++i)
{
auto stats = sampled[i];
auto & stats = sampled[i];
readBinaryLittleEndian(stats.value, buf);
readBinaryLittleEndian(stats.g, buf);
readBinaryLittleEndian(stats.delta, buf);
@ -291,7 +302,7 @@ private:
min_rank += curr_sample.g;
}
}
return {sampled.size()-1, 0, sampled.back().value};
return {sampled.size() - 1, 0, sampled.back().value};
}
void withHeadBufferInserted()
@ -389,12 +400,11 @@ private:
double relative_error;
size_t compress_threshold;
size_t count = 0;
size_t count;
bool compressed;
PaddedPODArray<Stats> sampled;
PaddedPODArray<Stats> backup_sampled;
PaddedPODArray<T> head_sampled;
static constexpr size_t default_compress_threshold = 10000;
@ -406,17 +416,14 @@ class QuantileGK
{
private:
using Data = ApproxSampler<Value>;
mutable Data data;
Data data;
public:
QuantileGK() = default;
explicit QuantileGK(size_t accuracy) : data(1.0 / static_cast<double>(accuracy)) { }
void add(const Value & x)
{
data.insert(x);
}
void add(const Value & x) { data.insert(x); }
template <typename Weight>
void add(const Value &, const Weight &)
@ -429,22 +436,34 @@ public:
if (!data.isCompressed())
data.compress();
data.merge(rhs.data);
if (rhs.data.isCompressed())
data.merge(rhs.data);
else
{
/// We can't modify rhs, so copy it and compress
Data rhs_data_copy(rhs.data);
rhs_data_copy.compress();
data.merge(rhs_data_copy);
}
}
void serialize(WriteBuffer & buf) const
{
/// Always compress before serialization
if (!data.isCompressed())
data.compress();
data.write(buf);
if (data.isCompressed())
data.write(buf);
else
{
/// We can't modify rhs, so copy it and compress
Data data_copy(data);
data_copy.compress();
data_copy.write(buf);
}
}
void deserialize(ReadBuffer & buf)
{
data.read(buf);
/// Serialized data is always compressed
data.setCompressed();
}
@ -481,7 +500,6 @@ public:
}
};
template <typename Value, bool _> using FuncQuantileGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantileGK, false, void, false, true>;
template <typename Value, bool _> using FuncQuantilesGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantilesGK, false, void, true, true>;

View File

@ -136,12 +136,12 @@ namespace
{
void assertDigest(
const KeeperStorage::Digest & first,
const KeeperStorage::Digest & second,
const KeeperStorage::Digest & expected,
const KeeperStorage::Digest & actual,
const Coordination::ZooKeeperRequest & request,
bool committing)
{
if (!KeeperStorage::checkDigest(first, second))
if (!KeeperStorage::checkDigest(expected, actual))
{
LOG_FATAL(
getLogger("KeeperStateMachine"),
@ -149,9 +149,9 @@ void assertDigest(
"{}). Keeper will terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
committing ? "committing" : "preprocessing",
request.getOpNum(),
first.value,
second.value,
first.version,
expected.value,
actual.value,
expected.version,
request.toString());
std::terminate();
}

View File

@ -174,7 +174,6 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Kee
hash.update(data);
hash.update(stat.czxid);
hash.update(stat.czxid);
hash.update(stat.mzxid);
hash.update(stat.ctime);
@ -183,7 +182,6 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Kee
hash.update(stat.cversion);
hash.update(stat.aversion);
hash.update(stat.ephemeralOwner);
hash.update(data.length());
hash.update(stat.numChildren);
hash.update(stat.pzxid);
@ -2531,6 +2529,17 @@ void KeeperStorage::recalculateStats()
container.recalculateDataSize();
}
bool KeeperStorage::checkDigest(const Digest & first, const Digest & second)
{
if (first.version != second.version)
return true;
if (first.version == DigestVersion::NO_DIGEST)
return true;
return first.value == second.value;
}
String KeeperStorage::generateDigest(const String & userdata)
{
std::vector<String> user_password;

View File

@ -95,10 +95,11 @@ public:
{
NO_DIGEST = 0,
V1 = 1,
V2 = 2 // added system nodes that modify the digest on startup so digest from V0 is invalid
V2 = 2, // added system nodes that modify the digest on startup so digest from V0 is invalid
V3 = 3 // fixed bug with casting, removed duplicate czxid usage
};
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V2;
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V3;
struct ResponseForSession
{
@ -113,16 +114,7 @@ public:
uint64_t value{0};
};
static bool checkDigest(const Digest & first, const Digest & second)
{
if (first.version != second.version)
return true;
if (first.version == DigestVersion::NO_DIGEST)
return true;
return first.value == second.value;
}
static bool checkDigest(const Digest & first, const Digest & second);
static String generateDigest(const String & userdata);

View File

@ -6,6 +6,7 @@
#include <Formats/FormatSettings.h>
#include <IO/WriteBufferFromString.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/readFloatText.h>
#include <IO/Operators.h>
#include <base/find_symbols.h>

View File

@ -38,7 +38,6 @@
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/VarInt.h>
#include <pcg_random.hpp>
@ -51,6 +50,7 @@ namespace DB
template <typename Allocator>
struct Memory;
class PeekableReadBuffer;
namespace ErrorCodes
{

View File

@ -30,6 +30,7 @@ static const std::unordered_map<String, String> quantile_fuse_name_mapping =
{"quantileTDigestWeighted", "quantilesTDigestWeighted"},
{"quantileTiming", "quantilesTiming"},
{"quantileTimingWeighted", "quantilesTimingWeighted"},
{"quantileGK", "quantilesGK"},
};
String GatherFunctionQuantileData::toFusedNameOrSelf(const String & func_name)

View File

@ -7,6 +7,7 @@
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
#include <IO/PeekableReadBuffer.h>
namespace DB

View File

@ -2,6 +2,7 @@
#include <Formats/JSONUtils.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/ReadHelpers.h>
namespace DB

View File

@ -4,6 +4,7 @@
#include <Formats/FormatSettings.h>
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
#include <Processors/Formats/ISchemaReader.h>
#include <IO/PeekableReadBuffer.h>
namespace DB

View File

@ -7,6 +7,7 @@
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/PeekableReadBuffer.h>
#include <Formats/EscapingRuleUtils.h>

View File

@ -25,6 +25,7 @@
#include <IO/WriteHelpers.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/PeekableReadBuffer.h>
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>

View File

@ -1,5 +1,6 @@
#include <algorithm>
#include <memory>
#include <stack>
#include <Core/NamesAndTypes.h>
#include <Core/TypeId.h>

View File

@ -4,6 +4,7 @@
<server_id>1</server_id>
<create_snapshot_on_exit>1</create_snapshot_on_exit>
<digest_enabled>1</digest_enabled>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -19,6 +19,20 @@ select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(numbe
[99,199,249,313,776]
select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);
[100,200,250,314,777]
SELECT quantileGKMerge(100, 0.5)(x)
FROM
(
SELECT quantileGKState(100, 0.5)(number + 1) AS x
FROM numbers(49999)
);
24902
SELECT quantilesGKMerge(100, 0.5, 0.9, 0.99)(x)
FROM
(
SELECT quantilesGKState(100, 0.5, 0.9, 0.99)(number + 1) AS x
FROM numbers(49999)
);
[24902,44518,49999]
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select quantileGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }

View File

@ -15,6 +15,19 @@ select quantilesGK(100, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number
select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);
select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);
SELECT quantileGKMerge(100, 0.5)(x)
FROM
(
SELECT quantileGKState(100, 0.5)(number + 1) AS x
FROM numbers(49999)
);
SELECT quantilesGKMerge(100, 0.5, 0.9, 0.99)(x)
FROM
(
SELECT quantilesGKState(100, 0.5, 0.9, 0.99)(number + 1) AS x
FROM numbers(49999)
);
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }

View File

@ -1 +1 @@
2024-01-01 Hello World
1

View File

@ -1,6 +1,6 @@
CREATE table if not exists table_with_dot_column (date Date, regular_column String, `other_column.2` String) ENGINE = MergeTree() ORDER BY date;
INSERT INTO table_with_dot_column select '2020-01-01', 'Hello', 'World';
INSERT INTO table_with_dot_column select '2024-01-01', 'Hello', 'World';
CREATE TABLE IF NOT EXISTS table_with_dot_column (date Date, regular_column String, `other_column.2` String) ENGINE = MergeTree() ORDER BY date;
INSERT INTO table_with_dot_column SELECT '2020-01-01', 'Hello', 'World';
INSERT INTO table_with_dot_column SELECT toDate(now() + 48*3600), 'Hello', 'World';
CREATE ROW POLICY IF NOT EXISTS row_policy ON table_with_dot_column USING toDate(date) >= today() - 30 TO ALL;
SELECT * FROM table_with_dot_column;
SELECT count(*) FROM table_with_dot_column;
DROP TABLE table_with_dot_column;