mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge remote-tracking branch 'ClickHouse/master' into query-time-ef-search
This commit is contained in:
commit
4b6b152562
@ -1,4 +1,21 @@
|
||||
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_DEBUG=0") # More checks in debug build.
|
||||
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
|
||||
# Enable libcxx debug mode: https://releases.llvm.org/15.0.0/projects/libcxx/docs/DesignDocs/DebugMode.html
|
||||
# The docs say the debug mode violates complexity guarantees, so do this only for Debug builds.
|
||||
# set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_ENABLE_DEBUG_MODE=1")
|
||||
# ^^ Crashes the database upon startup, needs investigation.
|
||||
# Besides that, the implementation looks like a poor man's MSAN specific to libcxx. Since CI tests MSAN
|
||||
# anyways, we can keep the debug mode disabled.
|
||||
|
||||
# Libcxx also provides extra assertions:
|
||||
# --> https://releases.llvm.org/15.0.0/projects/libcxx/docs/UsingLibcxx.html#assertions-mode
|
||||
# These look orthogonal to the debug mode but the debug mode enables them implicitly:
|
||||
# --> https://github.com/llvm/llvm-project/blob/release/15.x/libcxx/include/__assert#L29
|
||||
# They are cheap and straightforward, so enable them in debug builds:
|
||||
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_ENABLE_ASSERTIONS=1")
|
||||
|
||||
# TODO Once we upgrade to LLVM 18+, reconsider all of the above as they introduced "hardening modes":
|
||||
# https://libcxx.llvm.org/Hardening.html
|
||||
endif ()
|
||||
|
||||
add_subdirectory(contrib/libcxxabi-cmake)
|
||||
add_subdirectory(contrib/libcxx-cmake)
|
||||
|
@ -1,6 +1,9 @@
|
||||
set(ABSL_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/abseil-cpp")
|
||||
set(ABSL_COMMON_INCLUDE_DIRS "${ABSL_ROOT_DIR}")
|
||||
|
||||
# To avoid errors "'X' does not refer to a value" while using `offsetof` function.
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
|
||||
# This is a minimized version of the function definition in CMake/AbseilHelpers.cmake
|
||||
|
||||
#
|
||||
|
@ -5,6 +5,9 @@ if(NOT ENABLE_PROTOBUF)
|
||||
return()
|
||||
endif()
|
||||
|
||||
# To avoid errors "'X' does not refer to a value" while using `offsetof` function.
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
|
||||
set(Protobuf_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/google-protobuf/src")
|
||||
if(OS_FREEBSD AND SANITIZE STREQUAL "address")
|
||||
# ../contrib/protobuf/src/google/protobuf/arena_impl.h:45:10: fatal error: 'sanitizer/asan_interface.h' file not found
|
||||
|
@ -6,6 +6,8 @@ if(NOT ENABLE_GRPC)
|
||||
return()
|
||||
endif()
|
||||
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
|
||||
set(_gRPC_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/grpc")
|
||||
set(_gRPC_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/grpc")
|
||||
|
||||
|
@ -22,7 +22,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
# We want to use C++23, but GRPC is not ready
|
||||
set (CMAKE_CXX_STANDARD 20)
|
||||
set (CMAKE_CXX_STANDARD 17)
|
||||
|
||||
set(_gRPC_ZLIB_INCLUDE_DIR "")
|
||||
set(_gRPC_ZLIB_LIBRARIES ch_contrib::zlib)
|
||||
|
@ -2933,7 +2933,42 @@ The same as ‘today() - 1’.
|
||||
|
||||
## timeSlot
|
||||
|
||||
Rounds the time to the half hour.
|
||||
Round the time to the start of a half-an-hour length interval.
|
||||
|
||||
**Syntax**
|
||||
|
||||
```sql
|
||||
timeSlot(time[, time_zone])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `time` — Time to round to the start of a half-an-hour length interval. [DateTime](../data-types/datetime.md)/[Date32](../data-types/date32.md)/[DateTime64](../data-types/datetime64.md).
|
||||
- `time_zone` — A String type const value or an expression representing the time zone. [String](../data-types/string.md).
|
||||
|
||||
:::note
|
||||
Though this function can take values of the extended types `Date32` and `DateTime64` as an argument, passing it a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results.
|
||||
:::
|
||||
|
||||
**Return type**
|
||||
|
||||
- Returns the time rounded to the start of a half-an-hour length interval. [DateTime](../data-types/datetime.md).
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SELECT timeSlot(toDateTime('2000-01-02 03:04:05', 'UTC'));
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌─timeSlot(toDateTime('2000-01-02 03:04:05', 'UTC'))─┐
|
||||
│ 2000-01-02 03:00:00 │
|
||||
└────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## toYYYYMM
|
||||
|
||||
|
@ -31,7 +31,7 @@ std::string RemoteProxyHostFetcherImpl::fetch(const Poco::URI & endpoint, const
|
||||
endpoint.toString(),
|
||||
response.getStatus(),
|
||||
response.getReason(),
|
||||
"");
|
||||
/* body_length = */ 0);
|
||||
|
||||
std::string proxy_host;
|
||||
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
|
||||
|
@ -25,15 +25,11 @@ namespace
|
||||
* `curl` strips leading dot and accepts url gitlab.com as a match for no_proxy .gitlab.com,
|
||||
* while `wget` does an exact match.
|
||||
* */
|
||||
std::string buildPocoRegexpEntryWithoutLeadingDot(const std::string & host)
|
||||
std::string buildPocoRegexpEntryWithoutLeadingDot(std::string_view host)
|
||||
{
|
||||
std::string_view view_without_leading_dot = host;
|
||||
if (host[0] == '.')
|
||||
{
|
||||
view_without_leading_dot = std::string_view {host.begin() + 1u, host.end()};
|
||||
}
|
||||
|
||||
return RE2::QuoteMeta(view_without_leading_dot);
|
||||
if (host.starts_with('.'))
|
||||
host.remove_prefix(1);
|
||||
return RE2::QuoteMeta(host);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -170,6 +170,9 @@ Avoid reordering rows when reading from Parquet files. Usually makes it much slo
|
||||
)", 0) \
|
||||
M(Bool, input_format_parquet_filter_push_down, true, R"(
|
||||
When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and min/max statistics in the Parquet metadata.
|
||||
)", 0) \
|
||||
M(Bool, input_format_parquet_bloom_filter_push_down, false, R"(
|
||||
When reading Parquet files, skip whole row groups based on the WHERE expressions and bloom filter in the Parquet metadata.
|
||||
)", 0) \
|
||||
M(Bool, input_format_parquet_use_native_reader, false, R"(
|
||||
When reading Parquet files, to use native reader instead of arrow reader.
|
||||
|
@ -102,6 +102,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"allow_experimental_refreshable_materialized_view", false, true, "Not experimental anymore"},
|
||||
{"max_parts_to_move", 1000, 1000, "New setting"},
|
||||
{"hnsw_candidate_list_size_for_search", 0, 0, "New setting"},
|
||||
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
|
||||
}
|
||||
},
|
||||
{"24.9",
|
||||
|
@ -36,8 +36,8 @@ public:
|
||||
|
||||
auto findByValue(const T & value) const
|
||||
{
|
||||
const auto it = value_to_name_map.find(value);
|
||||
if (it == std::end(value_to_name_map))
|
||||
auto it = value_to_name_map.find(value);
|
||||
if (it == value_to_name_map.end())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected value {} in enum", toString(value));
|
||||
|
||||
return it;
|
||||
@ -58,7 +58,7 @@ public:
|
||||
bool getNameForValue(const T & value, StringRef & result) const
|
||||
{
|
||||
const auto it = value_to_name_map.find(value);
|
||||
if (it == std::end(value_to_name_map))
|
||||
if (it == value_to_name_map.end())
|
||||
return false;
|
||||
|
||||
result = it->second;
|
||||
|
@ -321,6 +321,8 @@ bool isUInt8(TYPE data_type) { return WhichDataType(data_type).isUInt8(); } \
|
||||
bool isUInt16(TYPE data_type) { return WhichDataType(data_type).isUInt16(); } \
|
||||
bool isUInt32(TYPE data_type) { return WhichDataType(data_type).isUInt32(); } \
|
||||
bool isUInt64(TYPE data_type) { return WhichDataType(data_type).isUInt64(); } \
|
||||
bool isUInt128(TYPE data_type) { return WhichDataType(data_type).isUInt128(); } \
|
||||
bool isUInt256(TYPE data_type) { return WhichDataType(data_type).isUInt256(); } \
|
||||
bool isNativeUInt(TYPE data_type) { return WhichDataType(data_type).isNativeUInt(); } \
|
||||
bool isUInt(TYPE data_type) { return WhichDataType(data_type).isUInt(); } \
|
||||
\
|
||||
@ -328,6 +330,8 @@ bool isInt8(TYPE data_type) { return WhichDataType(data_type).isInt8(); } \
|
||||
bool isInt16(TYPE data_type) { return WhichDataType(data_type).isInt16(); } \
|
||||
bool isInt32(TYPE data_type) { return WhichDataType(data_type).isInt32(); } \
|
||||
bool isInt64(TYPE data_type) { return WhichDataType(data_type).isInt64(); } \
|
||||
bool isInt128(TYPE data_type) { return WhichDataType(data_type).isInt128(); } \
|
||||
bool isInt256(TYPE data_type) { return WhichDataType(data_type).isInt256(); } \
|
||||
bool isNativeInt(TYPE data_type) { return WhichDataType(data_type).isNativeInt(); } \
|
||||
bool isInt(TYPE data_type) { return WhichDataType(data_type).isInt(); } \
|
||||
\
|
||||
|
@ -457,7 +457,9 @@ struct WhichDataType
|
||||
bool isUInt8(TYPE data_type); \
|
||||
bool isUInt16(TYPE data_type); \
|
||||
bool isUInt32(TYPE data_type); \
|
||||
bool isUInt64(TYPE data_type); \
|
||||
bool isUInt64(TYPE data_type);\
|
||||
bool isUInt128(TYPE data_type);\
|
||||
bool isUInt256(TYPE data_type); \
|
||||
bool isNativeUInt(TYPE data_type); \
|
||||
bool isUInt(TYPE data_type); \
|
||||
\
|
||||
@ -465,6 +467,8 @@ bool isInt8(TYPE data_type); \
|
||||
bool isInt16(TYPE data_type); \
|
||||
bool isInt32(TYPE data_type); \
|
||||
bool isInt64(TYPE data_type); \
|
||||
bool isInt128(TYPE data_type); \
|
||||
bool isInt256(TYPE data_type); \
|
||||
bool isNativeInt(TYPE data_type); \
|
||||
bool isInt(TYPE data_type); \
|
||||
\
|
||||
|
@ -111,9 +111,9 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
|
||||
/// RWF_NOWAIT flag may return 0 even when not at end of file.
|
||||
/// It can't be distinguished from the real eof, so we have to
|
||||
/// disable pread with nowait.
|
||||
static std::atomic<bool> has_pread_nowait_support = !hasBugInPreadV2();
|
||||
static const bool has_pread_nowait_support = !hasBugInPreadV2();
|
||||
|
||||
if (has_pread_nowait_support.load(std::memory_order_relaxed))
|
||||
if (has_pread_nowait_support)
|
||||
{
|
||||
/// It reports real time spent including the time spent while thread was preempted doing nothing.
|
||||
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
|
||||
@ -161,7 +161,8 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
|
||||
if (errno == ENOSYS || errno == EOPNOTSUPP)
|
||||
{
|
||||
/// No support for the syscall or the flag in the Linux kernel.
|
||||
has_pread_nowait_support.store(false, std::memory_order_relaxed);
|
||||
/// It shouldn't happen because we check the kernel version but let's
|
||||
/// fallback to the thread pool.
|
||||
break;
|
||||
}
|
||||
if (errno == EAGAIN)
|
||||
|
@ -92,12 +92,26 @@ std::unique_ptr<S3::Client> getClient(
|
||||
"Region should be explicitly specified for directory buckets");
|
||||
}
|
||||
|
||||
const Settings & local_settings = context->getSettingsRef();
|
||||
|
||||
int s3_max_redirects = static_cast<int>(global_settings[Setting::s3_max_redirects]);
|
||||
if (!for_disk_s3 && local_settings.isChanged("s3_max_redirects"))
|
||||
s3_max_redirects = static_cast<int>(local_settings[Setting::s3_max_redirects]);
|
||||
|
||||
int s3_retry_attempts = static_cast<int>(global_settings[Setting::s3_retry_attempts]);
|
||||
if (!for_disk_s3 && local_settings.isChanged("s3_retry_attempts"))
|
||||
s3_retry_attempts = static_cast<int>(local_settings[Setting::s3_retry_attempts]);
|
||||
|
||||
bool enable_s3_requests_logging = global_settings[Setting::enable_s3_requests_logging];
|
||||
if (!for_disk_s3 && local_settings.isChanged("enable_s3_requests_logging"))
|
||||
enable_s3_requests_logging = local_settings[Setting::enable_s3_requests_logging];
|
||||
|
||||
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
|
||||
auth_settings.region,
|
||||
context->getRemoteHostFilter(),
|
||||
static_cast<int>(global_settings[Setting::s3_max_redirects]),
|
||||
static_cast<int>(global_settings[Setting::s3_retry_attempts]),
|
||||
global_settings[Setting::enable_s3_requests_logging],
|
||||
s3_max_redirects,
|
||||
s3_retry_attempts,
|
||||
enable_s3_requests_logging,
|
||||
for_disk_s3,
|
||||
request_settings.get_request_throttler,
|
||||
request_settings.put_request_throttler,
|
||||
|
@ -191,6 +191,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
|
||||
format_settings.parquet.case_insensitive_column_matching = settings[Setting::input_format_parquet_case_insensitive_column_matching];
|
||||
format_settings.parquet.preserve_order = settings[Setting::input_format_parquet_preserve_order];
|
||||
format_settings.parquet.filter_push_down = settings[Setting::input_format_parquet_filter_push_down];
|
||||
format_settings.parquet.bloom_filter_push_down = settings[Setting::input_format_parquet_bloom_filter_push_down];
|
||||
format_settings.parquet.use_native_reader = settings[Setting::input_format_parquet_use_native_reader];
|
||||
format_settings.parquet.allow_missing_columns = settings[Setting::input_format_parquet_allow_missing_columns];
|
||||
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings[Setting::input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference];
|
||||
|
@ -274,6 +274,7 @@ struct FormatSettings
|
||||
bool skip_columns_with_unsupported_types_in_schema_inference = false;
|
||||
bool case_insensitive_column_matching = false;
|
||||
bool filter_push_down = true;
|
||||
bool bloom_filter_push_down = true;
|
||||
bool use_native_reader = false;
|
||||
std::unordered_set<int> skip_row_groups = {};
|
||||
bool output_string_as_string = false;
|
||||
|
@ -84,11 +84,9 @@ void assertResponseIsOk(const String & uri, Poco::Net::HTTPResponse & response,
|
||||
? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
|
||||
: ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
|
||||
|
||||
std::stringstream body; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
body.exceptions(std::ios::failbit);
|
||||
body << istr.rdbuf();
|
||||
|
||||
throw HTTPException(code, uri, status, response.getReason(), body.str());
|
||||
istr.seekg(0, std::ios::end);
|
||||
size_t body_length = istr.tellg();
|
||||
throw HTTPException(code, uri, status, response.getReason(), body_length);
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,13 +95,13 @@ Exception HTTPException::makeExceptionMessage(
|
||||
const std::string & uri,
|
||||
Poco::Net::HTTPResponse::HTTPStatus http_status,
|
||||
const std::string & reason,
|
||||
const std::string & body)
|
||||
size_t body_length)
|
||||
{
|
||||
return Exception(code,
|
||||
"Received error from remote server {}. "
|
||||
"HTTP status code: {} {}, "
|
||||
"body: {}",
|
||||
uri, static_cast<int>(http_status), reason, body);
|
||||
"HTTP status code: {} '{}', "
|
||||
"body length: {} bytes",
|
||||
uri, static_cast<int>(http_status), reason, body_length);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -27,9 +27,9 @@ public:
|
||||
const std::string & uri,
|
||||
Poco::Net::HTTPResponse::HTTPStatus http_status_,
|
||||
const std::string & reason,
|
||||
const std::string & body
|
||||
size_t body_length = 0
|
||||
)
|
||||
: Exception(makeExceptionMessage(code, uri, http_status_, reason, body))
|
||||
: Exception(makeExceptionMessage(code, uri, http_status_, reason, body_length))
|
||||
, http_status(http_status_)
|
||||
{}
|
||||
|
||||
@ -46,7 +46,7 @@ private:
|
||||
const std::string & uri,
|
||||
Poco::Net::HTTPResponse::HTTPStatus http_status,
|
||||
const std::string & reason,
|
||||
const std::string & body);
|
||||
size_t body_length);
|
||||
|
||||
const char * name() const noexcept override { return "DB::HTTPException"; }
|
||||
const char * className() const noexcept override { return "DB::HTTPException"; }
|
||||
|
@ -423,8 +423,7 @@ std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::initialize()
|
||||
ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
|
||||
current_uri.toString(),
|
||||
Poco::Net::HTTPResponse::HTTP_REQUESTED_RANGE_NOT_SATISFIABLE,
|
||||
reason,
|
||||
"");
|
||||
reason);
|
||||
}
|
||||
throw Exception(
|
||||
ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
|
||||
@ -549,8 +548,7 @@ size_t ReadWriteBufferFromHTTP::readBigAt(char * to, size_t n, size_t offset, co
|
||||
ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
|
||||
current_uri.toString(),
|
||||
Poco::Net::HTTPResponse::HTTP_REQUESTED_RANGE_NOT_SATISFIABLE,
|
||||
reason,
|
||||
"");
|
||||
reason);
|
||||
}
|
||||
|
||||
copyFromIStreamWithProgressCallback(*result.response_stream, to, n, progress_callback, &bytes_copied, &is_canceled);
|
||||
|
@ -238,6 +238,8 @@ public:
|
||||
|
||||
const Columns & getOrderedSet() const { return ordered_set; }
|
||||
|
||||
const std::vector<KeyTuplePositionMapping> & getIndexesMapping() const { return indexes_mapping; }
|
||||
|
||||
private:
|
||||
// If all arguments in tuple are key columns, we can optimize NOT IN when there is only one element.
|
||||
bool has_all_keys;
|
||||
|
@ -274,7 +274,8 @@ size_t IRowInputFormat::countRows(size_t)
|
||||
|
||||
void IRowInputFormat::setSerializationHints(const SerializationInfoByName & hints)
|
||||
{
|
||||
serializations = getPort().getHeader().getSerializations(hints);
|
||||
if (supportsCustomSerializations())
|
||||
serializations = getPort().getHeader().getSerializations(hints);
|
||||
}
|
||||
|
||||
|
||||
|
@ -59,6 +59,7 @@ protected:
|
||||
/// `max_block_size` can be ignored.
|
||||
virtual size_t countRows(size_t max_block_size);
|
||||
virtual bool supportsCountRows() const { return false; }
|
||||
virtual bool supportsCustomSerializations() const { return false; }
|
||||
|
||||
virtual void readPrefix() {} /// delimiter before begin of result
|
||||
virtual void readSuffix() {} /// delimiter after end of result
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <arrow/type_fwd.h>
|
||||
#include <boost/algorithm/string/case_conv.hpp>
|
||||
#include <Common/Exception.h>
|
||||
#include <parquet/metadata.h>
|
||||
|
||||
|
||||
namespace arrow
|
||||
@ -65,11 +66,22 @@ public:
|
||||
return result;
|
||||
}
|
||||
|
||||
// For a parquet schema {x: {i: int, j: int}}, this should be populated as follows
|
||||
// clickhouse_index = 0, parquet_indexes = {0, 1}
|
||||
struct ClickHouseIndexToParquetIndex
|
||||
{
|
||||
std::size_t clickhouse_index;
|
||||
std::vector<int> parquet_indexes;
|
||||
};
|
||||
|
||||
/// Only collect the required fields' indices. Eg. when just read a field of a struct,
|
||||
/// don't need to collect the whole indices in this struct.
|
||||
std::vector<int> findRequiredIndices(const Block & header, const arrow::Schema & schema)
|
||||
std::vector<ClickHouseIndexToParquetIndex> findRequiredIndices(
|
||||
const Block & header,
|
||||
const arrow::Schema & schema,
|
||||
const parquet::FileMetaData & file)
|
||||
{
|
||||
std::vector<int> required_indices;
|
||||
std::vector<ClickHouseIndexToParquetIndex> required_indices;
|
||||
std::unordered_set<int> added_indices;
|
||||
/// Flat all named fields' index information into a map.
|
||||
auto fields_indices = calculateFieldIndices(schema);
|
||||
@ -79,7 +91,7 @@ public:
|
||||
std::string col_name = named_col.name;
|
||||
if (ignore_case)
|
||||
boost::to_lower(col_name);
|
||||
findRequiredIndices(col_name, named_col.type, fields_indices, added_indices, required_indices);
|
||||
findRequiredIndices(col_name, i, named_col.type, fields_indices, added_indices, required_indices, file);
|
||||
}
|
||||
return required_indices;
|
||||
}
|
||||
@ -169,10 +181,12 @@ private:
|
||||
|
||||
void findRequiredIndices(
|
||||
const String & name,
|
||||
std::size_t header_index,
|
||||
DataTypePtr data_type,
|
||||
const std::unordered_map<std::string, std::pair<int, int>> & field_indices,
|
||||
std::unordered_set<int> & added_indices,
|
||||
std::vector<int> & required_indices)
|
||||
std::vector<ClickHouseIndexToParquetIndex> & required_indices,
|
||||
const parquet::FileMetaData & file)
|
||||
{
|
||||
auto nested_type = removeNullable(data_type);
|
||||
if (const DB::DataTypeTuple * type_tuple = typeid_cast<const DB::DataTypeTuple *>(nested_type.get()))
|
||||
@ -187,20 +201,20 @@ private:
|
||||
if (ignore_case)
|
||||
boost::to_lower(field_name);
|
||||
const auto & field_type = field_types[i];
|
||||
findRequiredIndices(Nested::concatenateName(name, field_name), field_type, field_indices, added_indices, required_indices);
|
||||
findRequiredIndices(Nested::concatenateName(name, field_name), header_index, field_type, field_indices, added_indices, required_indices, file);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (const auto * type_array = typeid_cast<const DB::DataTypeArray *>(nested_type.get()))
|
||||
{
|
||||
findRequiredIndices(name, type_array->getNestedType(), field_indices, added_indices, required_indices);
|
||||
findRequiredIndices(name, header_index, type_array->getNestedType(), field_indices, added_indices, required_indices, file);
|
||||
return;
|
||||
}
|
||||
else if (const auto * type_map = typeid_cast<const DB::DataTypeMap *>(nested_type.get()))
|
||||
{
|
||||
findRequiredIndices(name, type_map->getKeyType(), field_indices, added_indices, required_indices);
|
||||
findRequiredIndices(name, type_map->getValueType(), field_indices, added_indices, required_indices);
|
||||
findRequiredIndices(name, header_index, type_map->getKeyType(), field_indices, added_indices, required_indices, file);
|
||||
findRequiredIndices(name, header_index, type_map->getValueType(), field_indices, added_indices, required_indices, file);
|
||||
return;
|
||||
}
|
||||
auto it = field_indices.find(name);
|
||||
@ -211,14 +225,18 @@ private:
|
||||
}
|
||||
else
|
||||
{
|
||||
ClickHouseIndexToParquetIndex index_mapping;
|
||||
index_mapping.clickhouse_index = header_index;
|
||||
for (int j = 0; j < it->second.second; ++j)
|
||||
{
|
||||
auto index = it->second.first + j;
|
||||
if (added_indices.insert(index).second)
|
||||
{
|
||||
required_indices.emplace_back(index);
|
||||
index_mapping.parquet_indexes.emplace_back(index);
|
||||
}
|
||||
}
|
||||
|
||||
required_indices.emplace_back(index_mapping);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -43,6 +43,7 @@ private:
|
||||
|
||||
size_t countRows(size_t max_block_size) override;
|
||||
bool supportsCountRows() const override { return true; }
|
||||
bool supportsCustomSerializations() const override { return true; }
|
||||
|
||||
const String & columnName(size_t i) const;
|
||||
size_t columnIndex(StringRef name, size_t key_index);
|
||||
|
@ -0,0 +1,525 @@
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h>
|
||||
#include <iostream>
|
||||
|
||||
#if USE_PARQUET
|
||||
|
||||
#include <parquet/bloom_filter.h>
|
||||
#include <parquet/xxhasher.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
bool isParquetStringTypeSupportedForBloomFilters(
|
||||
const std::shared_ptr<const parquet::LogicalType> & logical_type,
|
||||
parquet::ConvertedType::type converted_type)
|
||||
{
|
||||
if (logical_type &&
|
||||
!logical_type->is_none()
|
||||
&& !(logical_type->is_string() || logical_type->is_BSON() || logical_type->is_JSON()))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (parquet::ConvertedType::type::NONE != converted_type &&
|
||||
!(converted_type == parquet::ConvertedType::JSON || converted_type == parquet::ConvertedType::UTF8
|
||||
|| converted_type == parquet::ConvertedType::BSON))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool isParquetIntegerTypeSupportedForBloomFilters(const std::shared_ptr<const parquet::LogicalType> & logical_type, parquet::ConvertedType::type converted_type)
|
||||
{
|
||||
if (logical_type && !logical_type->is_none() && !logical_type->is_int())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (parquet::ConvertedType::type::NONE != converted_type && !(converted_type == parquet::ConvertedType::INT_8 || converted_type == parquet::ConvertedType::INT_16
|
||||
|| converted_type == parquet::ConvertedType::INT_32 || converted_type == parquet::ConvertedType::INT_64
|
||||
|| converted_type == parquet::ConvertedType::UINT_8 || converted_type == parquet::ConvertedType::UINT_16
|
||||
|| converted_type == parquet::ConvertedType::UINT_32 || converted_type == parquet::ConvertedType::UINT_64))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
uint64_t hashSpecialFLBATypes(const Field & field)
|
||||
{
|
||||
const T & value = field.safeGet<T>();
|
||||
|
||||
parquet::FLBA flba(reinterpret_cast<const uint8_t*>(&value));
|
||||
|
||||
parquet::XxHasher hasher;
|
||||
|
||||
return hasher.Hash(&flba, sizeof(T));
|
||||
};
|
||||
|
||||
std::optional<uint64_t> tryHashStringWithoutCompatibilityCheck(const Field & field)
|
||||
{
|
||||
const auto field_type = field.getType();
|
||||
|
||||
if (field_type != Field::Types::Which::String)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
parquet::XxHasher hasher;
|
||||
parquet::ByteArray ba { field.safeGet<std::string>() };
|
||||
|
||||
return hasher.Hash(&ba);
|
||||
}
|
||||
|
||||
std::optional<uint64_t> tryHashString(
|
||||
const Field & field,
|
||||
const std::shared_ptr<const parquet::LogicalType> & logical_type,
|
||||
parquet::ConvertedType::type converted_type)
|
||||
{
|
||||
if (!isParquetStringTypeSupportedForBloomFilters(logical_type, converted_type))
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return tryHashStringWithoutCompatibilityCheck(field);
|
||||
}
|
||||
|
||||
std::optional<uint64_t> tryHashFLBA(
|
||||
const Field & field,
|
||||
const std::shared_ptr<const parquet::LogicalType> & logical_type,
|
||||
parquet::ConvertedType::type converted_type,
|
||||
std::size_t parquet_column_length)
|
||||
{
|
||||
if (!isParquetStringTypeSupportedForBloomFilters(logical_type, converted_type))
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
const auto field_type = field.getType();
|
||||
|
||||
if (field_type == Field::Types::Which::IPv6 && parquet_column_length == sizeof(IPv6))
|
||||
{
|
||||
return hashSpecialFLBATypes<IPv6>(field);
|
||||
}
|
||||
|
||||
return tryHashStringWithoutCompatibilityCheck(field);
|
||||
}
|
||||
|
||||
template <typename ParquetPhysicalType>
|
||||
std::optional<uint64_t> tryHashInt(const Field & field, const std::shared_ptr<const parquet::LogicalType> & logical_type, parquet::ConvertedType::type converted_type)
|
||||
{
|
||||
if (!isParquetIntegerTypeSupportedForBloomFilters(logical_type, converted_type))
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
parquet::XxHasher hasher;
|
||||
|
||||
if (field.getType() == Field::Types::Which::Int64)
|
||||
{
|
||||
return hasher.Hash(static_cast<ParquetPhysicalType>(field.safeGet<int64_t>()));
|
||||
}
|
||||
else if (field.getType() == Field::Types::Which::UInt64)
|
||||
{
|
||||
return hasher.Hash(static_cast<ParquetPhysicalType>(field.safeGet<uint64_t>()));
|
||||
}
|
||||
else if (field.getType() == Field::Types::IPv4)
|
||||
{
|
||||
/*
|
||||
* In theory, we could accept IPv4 over 64 bits variables. It would only be a problem in case it was hashed using the byte array api
|
||||
* with a zero-ed buffer that had a 32 bits variable copied into it.
|
||||
*
|
||||
* To be on the safe side, accept only in case physical type is 32 bits.
|
||||
* */
|
||||
if constexpr (std::is_same_v<int32_t, ParquetPhysicalType>)
|
||||
{
|
||||
return hasher.Hash(static_cast<ParquetPhysicalType>(field.safeGet<IPv4>()));
|
||||
}
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<uint64_t> tryHash(const Field & field, const parquet::ColumnDescriptor * parquet_column_descriptor)
|
||||
{
|
||||
const auto physical_type = parquet_column_descriptor->physical_type();
|
||||
const auto & logical_type = parquet_column_descriptor->logical_type();
|
||||
const auto converted_type = parquet_column_descriptor->converted_type();
|
||||
|
||||
switch (physical_type)
|
||||
{
|
||||
case parquet::Type::type::INT32:
|
||||
return tryHashInt<int32_t>(field, logical_type, converted_type);
|
||||
case parquet::Type::type::INT64:
|
||||
return tryHashInt<int64_t>(field, logical_type, converted_type);
|
||||
case parquet::Type::type::BYTE_ARRAY:
|
||||
return tryHashString(field, logical_type, converted_type);
|
||||
case parquet::Type::type::FIXED_LEN_BYTE_ARRAY:
|
||||
return tryHashFLBA(field, logical_type, converted_type, parquet_column_descriptor->type_length());
|
||||
default:
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::vector<uint64_t>> hash(const IColumn * data_column, const parquet::ColumnDescriptor * parquet_column_descriptor)
|
||||
{
|
||||
std::vector<uint64_t> hashes;
|
||||
|
||||
for (size_t i = 0u; i < data_column->size(); i++)
|
||||
{
|
||||
Field f;
|
||||
data_column->get(i, f);
|
||||
|
||||
auto hashed_value = tryHash(f, parquet_column_descriptor);
|
||||
|
||||
if (!hashed_value)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
hashes.emplace_back(*hashed_value);
|
||||
}
|
||||
|
||||
return hashes;
|
||||
}
|
||||
|
||||
bool maybeTrueOnBloomFilter(const std::vector<uint64_t> & hashes, const std::unique_ptr<parquet::BloomFilter> & bloom_filter)
|
||||
{
|
||||
for (const auto hash : hashes)
|
||||
{
|
||||
if (bloom_filter->FindHash(hash))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent(
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
std::size_t clickhouse_column_index)
|
||||
{
|
||||
if (clickhouse_column_index_to_parquet_index.size() <= clickhouse_column_index)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const auto & parquet_indexes = clickhouse_column_index_to_parquet_index[clickhouse_column_index].parquet_indexes;
|
||||
|
||||
// complex types like structs, tuples and maps will have more than one index.
|
||||
// we don't support those for now
|
||||
if (parquet_indexes.size() > 1)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (parquet_indexes.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Something bad happened, raise an issue and try the query with `input_format_parquet_bloom_filter_push_down=false`");
|
||||
}
|
||||
|
||||
auto parquet_column_index = parquet_indexes[0];
|
||||
|
||||
const auto * parquet_column_descriptor = parquet_rg_metadata->schema()->Column(parquet_column_index);
|
||||
|
||||
bool column_has_bloom_filter = parquet_rg_metadata->ColumnChunk(parquet_column_index)->bloom_filter_offset().has_value();
|
||||
if (!column_has_bloom_filter)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return parquet_column_descriptor;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ParquetBloomFilterCondition::ParquetBloomFilterCondition(const std::vector<ConditionElement> & condition_, const Block & header_)
|
||||
: condition(condition_), header(header_)
|
||||
{
|
||||
}
|
||||
|
||||
bool ParquetBloomFilterCondition::mayBeTrueOnRowGroup(const ColumnIndexToBF & column_index_to_column_bf) const
|
||||
{
|
||||
using Function = ConditionElement::Function;
|
||||
std::vector<BoolMask> rpn_stack;
|
||||
|
||||
for (const auto & element : condition)
|
||||
{
|
||||
if (element.function == Function::FUNCTION_IN
|
||||
|| element.function == Function::FUNCTION_NOT_IN)
|
||||
{
|
||||
bool maybe_true = true;
|
||||
for (auto column_index = 0u; column_index < element.hashes_per_column.size(); column_index++)
|
||||
{
|
||||
// in case bloom filter is not present for this row group
|
||||
// https://github.com/ClickHouse/ClickHouse/pull/62966#discussion_r1722361237
|
||||
if (!column_index_to_column_bf.contains(element.key_columns[column_index]))
|
||||
{
|
||||
rpn_stack.emplace_back(true, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
bool column_maybe_contains = maybeTrueOnBloomFilter(
|
||||
element.hashes_per_column[column_index],
|
||||
column_index_to_column_bf.at(element.key_columns[column_index]));
|
||||
|
||||
if (!column_maybe_contains)
|
||||
{
|
||||
maybe_true = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
rpn_stack.emplace_back(maybe_true, true);
|
||||
if (element.function == Function::FUNCTION_NOT_IN)
|
||||
rpn_stack.back() = !rpn_stack.back();
|
||||
}
|
||||
else if (element.function == Function::FUNCTION_NOT)
|
||||
{
|
||||
rpn_stack.back() = !rpn_stack.back();
|
||||
}
|
||||
else if (element.function == Function::FUNCTION_OR)
|
||||
{
|
||||
auto arg1 = rpn_stack.back();
|
||||
rpn_stack.pop_back();
|
||||
auto arg2 = rpn_stack.back();
|
||||
rpn_stack.back() = arg1 | arg2;
|
||||
}
|
||||
else if (element.function == Function::FUNCTION_AND)
|
||||
{
|
||||
auto arg1 = rpn_stack.back();
|
||||
rpn_stack.pop_back();
|
||||
auto arg2 = rpn_stack.back();
|
||||
rpn_stack.back() = arg1 & arg2;
|
||||
}
|
||||
else if (element.function == Function::ALWAYS_TRUE)
|
||||
{
|
||||
rpn_stack.emplace_back(true, false);
|
||||
}
|
||||
else if (element.function == Function::ALWAYS_FALSE)
|
||||
{
|
||||
rpn_stack.emplace_back(false, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
rpn_stack.emplace_back(true, true);
|
||||
}
|
||||
}
|
||||
|
||||
if (rpn_stack.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::mayBeTrueOnRowGroup");
|
||||
|
||||
return rpn_stack[0].can_be_true;
|
||||
}
|
||||
|
||||
std::unordered_set<std::size_t> ParquetBloomFilterCondition::getFilteringColumnKeys() const
|
||||
{
|
||||
std::unordered_set<std::size_t> column_keys;
|
||||
|
||||
for (const auto & element : condition)
|
||||
{
|
||||
for (const auto index : element.key_columns)
|
||||
{
|
||||
column_keys.insert(index);
|
||||
}
|
||||
}
|
||||
|
||||
return column_keys;
|
||||
}
|
||||
|
||||
/*
|
||||
* `KeyCondition::rpn` is overly complex for bloom filters, some operations are not even supported. Not only that, but to avoid hashing each time
|
||||
* we loop over a rpn element, we need to store hashes instead of where predicate values. To address this, we loop over `KeyCondition::rpn`
|
||||
* and build a simplified RPN that holds hashes instead of values.
|
||||
*
|
||||
* `KeyCondition::RPNElement::FUNCTION_IN_RANGE` becomes:
|
||||
* `FUNCTION_IN`
|
||||
* `FUNCTION_UNKNOWN` when range limits are different
|
||||
* `KeyCondition::RPNElement::FUNCTION_IN_SET` becomes
|
||||
* `FUNCTION_IN`
|
||||
*
|
||||
* Complex types and structs are not supported.
|
||||
* There are two sources of data types being analyzed, and they need to be compatible: DB::Field type and parquet type.
|
||||
* This is determined by the `isColumnSupported` method.
|
||||
*
|
||||
* Some interesting examples:
|
||||
* 1. file(..., 'str_column UInt64') where str_column = 50; Field.type == UInt64. Parquet type string. Not supported.
|
||||
* 2. file(...) where str_column = 50; Field.type == String (conversion already taken care by `KeyCondition`). Parquet type string.
|
||||
* 3. file(...) where uint32_column = toIPv4(5). Field.type == IPv4. Incompatible column types, resolved by `KeyCondition` itself.
|
||||
* 4. file(...) where toIPv4(uint32_column) = toIPv4(5). Field.type == IPv4. We know it is safe to hash it using an int32 API.
|
||||
* */
|
||||
std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParquetBloomFilterCondition(
|
||||
const std::vector<KeyCondition::RPNElement> & rpn,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata)
|
||||
{
|
||||
std::vector<ParquetBloomFilterCondition::ConditionElement> condition_elements;
|
||||
|
||||
using RPNElement = KeyCondition::RPNElement;
|
||||
using Function = ParquetBloomFilterCondition::ConditionElement::Function;
|
||||
|
||||
for (const auto & rpn_element : rpn)
|
||||
{
|
||||
// this would be a problem for `where negate(x) = -58`.
|
||||
// It would perform a bf search on `-58`, and possibly miss row groups containing this data.
|
||||
if (!rpn_element.monotonic_functions_chain.empty())
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
ParquetBloomFilterCondition::ConditionElement::HashesForColumns hashes;
|
||||
|
||||
if (rpn_element.function == RPNElement::FUNCTION_IN_RANGE
|
||||
|| rpn_element.function == RPNElement::FUNCTION_NOT_IN_RANGE)
|
||||
{
|
||||
// Only FUNCTION_EQUALS is supported and for that extremes need to be the same
|
||||
if (rpn_element.range.left != rpn_element.range.right)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto * parquet_column_descriptor =
|
||||
getColumnDescriptorIfBloomFilterIsPresent(parquet_rg_metadata, clickhouse_column_index_to_parquet_index, rpn_element.key_column);
|
||||
|
||||
if (!parquet_column_descriptor)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto hashed_value = tryHash(rpn_element.range.left, parquet_column_descriptor);
|
||||
|
||||
if (!hashed_value)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> hashes_for_column;
|
||||
hashes_for_column.emplace_back(*hashed_value);
|
||||
|
||||
hashes.emplace_back(std::move(hashes_for_column));
|
||||
|
||||
auto function = rpn_element.function == RPNElement::FUNCTION_IN_RANGE
|
||||
? ParquetBloomFilterCondition::ConditionElement::Function::FUNCTION_IN
|
||||
: ParquetBloomFilterCondition::ConditionElement::Function::FUNCTION_NOT_IN;
|
||||
|
||||
std::vector<std::size_t> key_columns;
|
||||
key_columns.emplace_back(rpn_element.key_column);
|
||||
|
||||
condition_elements.emplace_back(function, std::move(hashes), std::move(key_columns));
|
||||
}
|
||||
else if (rpn_element.function == RPNElement::FUNCTION_IN_SET
|
||||
|| rpn_element.function == RPNElement::FUNCTION_NOT_IN_SET)
|
||||
{
|
||||
const auto & set_index = rpn_element.set_index;
|
||||
const auto & ordered_set = set_index->getOrderedSet();
|
||||
const auto & indexes_mapping = set_index->getIndexesMapping();
|
||||
bool found_empty_column = false;
|
||||
|
||||
std::vector<std::size_t> key_columns;
|
||||
|
||||
for (auto i = 0u; i < ordered_set.size(); i++)
|
||||
{
|
||||
const auto & set_column = ordered_set[i];
|
||||
|
||||
const auto * parquet_column_descriptor = getColumnDescriptorIfBloomFilterIsPresent(
|
||||
parquet_rg_metadata,
|
||||
clickhouse_column_index_to_parquet_index,
|
||||
indexes_mapping[i].key_index);
|
||||
|
||||
if (!parquet_column_descriptor)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
auto column = set_column;
|
||||
|
||||
if (column->empty())
|
||||
{
|
||||
found_empty_column = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (const auto & nullable_column = checkAndGetColumn<ColumnNullable>(set_column.get()))
|
||||
{
|
||||
column = nullable_column->getNestedColumnPtr();
|
||||
}
|
||||
|
||||
auto hashes_for_column_opt = hash(column.get(), parquet_column_descriptor);
|
||||
|
||||
if (!hashes_for_column_opt)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
auto & hashes_for_column = *hashes_for_column_opt;
|
||||
|
||||
if (hashes_for_column.empty())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
hashes.emplace_back(hashes_for_column);
|
||||
|
||||
key_columns.push_back(indexes_mapping[i].key_index);
|
||||
}
|
||||
|
||||
if (found_empty_column)
|
||||
{
|
||||
condition_elements.emplace_back(Function::ALWAYS_FALSE);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (hashes.empty())
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto function = RPNElement::FUNCTION_IN_SET == rpn_element.function ? Function::FUNCTION_IN : Function::FUNCTION_NOT_IN;
|
||||
|
||||
condition_elements.emplace_back(function, hashes, key_columns);
|
||||
}
|
||||
else if (rpn_element.function == RPNElement::FUNCTION_NOT)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_NOT);
|
||||
}
|
||||
else if (rpn_element.function == RPNElement::FUNCTION_OR)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_OR);
|
||||
}
|
||||
else if (rpn_element.function == RPNElement::FUNCTION_AND)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_AND);
|
||||
}
|
||||
else
|
||||
{
|
||||
condition_elements.emplace_back(Function::ALWAYS_TRUE);
|
||||
}
|
||||
}
|
||||
|
||||
return condition_elements;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -0,0 +1,73 @@
|
||||
#pragma once
|
||||
|
||||
#include <config.h>
|
||||
|
||||
#if USE_PARQUET
|
||||
|
||||
#include <Storages/MergeTree/KeyCondition.h>
|
||||
#include <parquet/metadata.h>
|
||||
#include <Processors/Formats/Impl/ArrowFieldIndexUtil.h>
|
||||
|
||||
namespace parquet
|
||||
{
|
||||
class BloomFilter;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ParquetBloomFilterCondition
|
||||
{
|
||||
public:
|
||||
|
||||
struct ConditionElement
|
||||
{
|
||||
enum Function
|
||||
{
|
||||
/// Atoms of a Boolean expression.
|
||||
FUNCTION_IN,
|
||||
FUNCTION_NOT_IN,
|
||||
/// Can take any value.
|
||||
FUNCTION_UNKNOWN,
|
||||
/// Operators of the logical expression.
|
||||
FUNCTION_NOT,
|
||||
FUNCTION_AND,
|
||||
FUNCTION_OR,
|
||||
/// Constants
|
||||
ALWAYS_FALSE,
|
||||
ALWAYS_TRUE,
|
||||
};
|
||||
|
||||
using ColumnPtr = IColumn::Ptr;
|
||||
using HashesForColumns = std::vector<std::vector<uint64_t>>;
|
||||
using KeyColumns = std::vector<std::size_t>;
|
||||
|
||||
Function function;
|
||||
// each entry represents a list of hashes per column
|
||||
// suppose there are three columns with 2 rows each
|
||||
// hashes_per_column.size() == 3 and hashes_per_column[0].size() == 2
|
||||
HashesForColumns hashes_per_column;
|
||||
KeyColumns key_columns;
|
||||
};
|
||||
|
||||
using RPNElement = KeyCondition::RPNElement;
|
||||
using ColumnIndexToBF = std::unordered_map<std::size_t, std::unique_ptr<parquet::BloomFilter>>;
|
||||
|
||||
explicit ParquetBloomFilterCondition(const std::vector<ConditionElement> & condition_, const Block & header_);
|
||||
|
||||
bool mayBeTrueOnRowGroup(const ColumnIndexToBF & column_index_to_column_bf) const;
|
||||
std::unordered_set<std::size_t> getFilteringColumnKeys() const;
|
||||
|
||||
private:
|
||||
std::vector<ParquetBloomFilterCondition::ConditionElement> condition;
|
||||
Block header;
|
||||
};
|
||||
|
||||
std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParquetBloomFilterCondition(
|
||||
const std::vector<KeyCondition::RPNElement> & rpn,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -14,6 +14,8 @@
|
||||
#include <arrow/status.h>
|
||||
#include <parquet/arrow/reader.h>
|
||||
#include <parquet/arrow/schema.h>
|
||||
#include <parquet/bloom_filter.h>
|
||||
#include <parquet/bloom_filter_reader.h>
|
||||
#include <parquet/file_reader.h>
|
||||
#include <parquet/statistics.h>
|
||||
#include "ArrowBufferedStreams.h"
|
||||
@ -25,6 +27,7 @@
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <Common/FieldVisitorsAccurateComparison.h>
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetRecordReader.h>
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
@ -263,6 +266,50 @@ static Field decodePlainParquetValueSlow(const std::string & data, parquet::Type
|
||||
return field;
|
||||
}
|
||||
|
||||
static ParquetBloomFilterCondition::ColumnIndexToBF buildColumnIndexToBF(
|
||||
parquet::BloomFilterReader & bf_reader,
|
||||
int row_group,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
const std::unordered_set<std::size_t> & filtering_columns
|
||||
)
|
||||
{
|
||||
auto rg_bf = bf_reader.RowGroup(row_group);
|
||||
|
||||
if (!rg_bf)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
ParquetBloomFilterCondition::ColumnIndexToBF index_to_column_bf;
|
||||
|
||||
for (const auto & [clickhouse_index, parquet_indexes] : clickhouse_column_index_to_parquet_index)
|
||||
{
|
||||
if (!filtering_columns.contains(clickhouse_index))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// Complex / nested types contain more than one index. We don't support those.
|
||||
if (parquet_indexes.size() > 1)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
auto parquet_index = parquet_indexes[0];
|
||||
|
||||
auto bf = rg_bf->GetColumnBloomFilter(parquet_index);
|
||||
|
||||
if (!bf)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
index_to_column_bf[clickhouse_index] = std::move(bf);
|
||||
}
|
||||
|
||||
return index_to_column_bf;
|
||||
}
|
||||
|
||||
/// Range of values for each column, based on statistics in the Parquet metadata.
|
||||
/// This is lower/upper bounds, not necessarily exact min and max, e.g. the min/max can be just
|
||||
/// missing in the metadata.
|
||||
@ -474,9 +521,27 @@ void ParquetBlockInputFormat::initializeIfNeeded()
|
||||
ArrowFieldIndexUtil field_util(
|
||||
format_settings.parquet.case_insensitive_column_matching,
|
||||
format_settings.parquet.allow_missing_columns);
|
||||
column_indices = field_util.findRequiredIndices(getPort().getHeader(), *schema);
|
||||
|
||||
auto index_mapping = field_util.findRequiredIndices(getPort().getHeader(), *schema, *metadata);
|
||||
|
||||
for (const auto & [clickhouse_header_index, parquet_indexes] : index_mapping)
|
||||
{
|
||||
for (auto parquet_index : parquet_indexes)
|
||||
{
|
||||
column_indices.push_back(parquet_index);
|
||||
}
|
||||
}
|
||||
|
||||
int num_row_groups = metadata->num_row_groups();
|
||||
|
||||
if (num_row_groups == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
const auto bf_reader_properties = parquet::default_reader_properties();
|
||||
std::unique_ptr<parquet::BloomFilterReader> bf_reader;
|
||||
|
||||
row_group_batches.reserve(num_row_groups);
|
||||
|
||||
auto adaptive_chunk_size = [&](int row_group_idx) -> size_t
|
||||
@ -497,11 +562,38 @@ void ParquetBlockInputFormat::initializeIfNeeded()
|
||||
return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast<size_t>(format_settings.parquet.max_block_size));
|
||||
};
|
||||
|
||||
std::unique_ptr<ParquetBloomFilterCondition> parquet_bloom_filter_condition;
|
||||
|
||||
std::unordered_set<std::size_t> filtering_columns;
|
||||
|
||||
if (format_settings.parquet.bloom_filter_push_down && key_condition)
|
||||
{
|
||||
bf_reader = parquet::BloomFilterReader::Make(arrow_file, metadata, bf_reader_properties, nullptr);
|
||||
|
||||
const auto parquet_conditions = keyConditionRPNToParquetBloomFilterCondition(
|
||||
key_condition->getRPN(),
|
||||
index_mapping,
|
||||
metadata->RowGroup(0));
|
||||
parquet_bloom_filter_condition = std::make_unique<ParquetBloomFilterCondition>(parquet_conditions, getPort().getHeader());
|
||||
|
||||
filtering_columns = parquet_bloom_filter_condition->getFilteringColumnKeys();
|
||||
}
|
||||
|
||||
for (int row_group = 0; row_group < num_row_groups; ++row_group)
|
||||
{
|
||||
if (skip_row_groups.contains(row_group))
|
||||
continue;
|
||||
|
||||
if (parquet_bloom_filter_condition)
|
||||
{
|
||||
const auto column_index_to_bf = buildColumnIndexToBF(*bf_reader, row_group, index_mapping, filtering_columns);
|
||||
|
||||
if (!parquet_bloom_filter_condition->mayBeTrueOnRowGroup(column_index_to_bf))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (format_settings.parquet.filter_push_down && key_condition
|
||||
&& !key_condition
|
||||
->checkInHyperrectangle(
|
||||
|
@ -38,6 +38,7 @@ private:
|
||||
|
||||
bool supportsCountRows() const override { return true; }
|
||||
size_t countRows(size_t max_block_size) override;
|
||||
bool supportsCustomSerializations() const override { return true; }
|
||||
|
||||
const FormatSettings format_settings;
|
||||
|
||||
|
@ -48,6 +48,7 @@ protected:
|
||||
bool isGarbageAfterField(size_t index, ReadBuffer::Position pos) override;
|
||||
void setReadBuffer(ReadBuffer & in_) override;
|
||||
void readPrefix() override;
|
||||
bool supportsCustomSerializations() const override { return true; }
|
||||
|
||||
const FormatSettings format_settings;
|
||||
DataTypes data_types;
|
||||
|
@ -817,7 +817,7 @@ void KeyCondition::getAllSpaceFillingCurves()
|
||||
KeyCondition::KeyCondition(
|
||||
const ActionsDAG * filter_dag,
|
||||
ContextPtr context,
|
||||
const Names & key_column_names,
|
||||
const Names & key_column_names_,
|
||||
const ExpressionActionsPtr & key_expr_,
|
||||
bool single_point_)
|
||||
: key_expr(key_expr_)
|
||||
@ -825,7 +825,7 @@ KeyCondition::KeyCondition(
|
||||
, single_point(single_point_)
|
||||
{
|
||||
size_t key_index = 0;
|
||||
for (const auto & name : key_column_names)
|
||||
for (const auto & name : key_column_names_)
|
||||
{
|
||||
if (!key_columns.contains(name))
|
||||
{
|
||||
|
@ -149,7 +149,6 @@ void traverse(const ParserImpl::Element & e, std::shared_ptr<JSONNode> node)
|
||||
|
||||
std::shared_ptr<JSONNode> parseJSON(const String & json)
|
||||
{
|
||||
std::string_view view{json.begin(), json.end()};
|
||||
ParserImpl::Element document;
|
||||
ParserImpl p;
|
||||
|
||||
|
@ -62,6 +62,7 @@ public:
|
||||
/// Avoid loading nested table by returning nullptr/false for all table functions.
|
||||
StoragePolicyPtr getStoragePolicy() const override { return nullptr; }
|
||||
bool storesDataOnDisk() const override { return false; }
|
||||
bool supportsReplication() const override { return false; }
|
||||
|
||||
void startup() override { }
|
||||
void shutdown(bool is_drop) override
|
||||
|
@ -0,0 +1,16 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</cluster>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -35,6 +35,7 @@ from pyspark.sql.window import Window
|
||||
|
||||
import helpers.client
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.s3_tools import (
|
||||
get_file_contents,
|
||||
list_s3_objects,
|
||||
@ -74,10 +75,23 @@ def started_cluster():
|
||||
main_configs=[
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
"configs/config.d/remote_servers.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
with_minio=True,
|
||||
stay_alive=True,
|
||||
with_zookeeper=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node2",
|
||||
main_configs=[
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/remote_servers.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
with_minio=True,
|
||||
stay_alive=True,
|
||||
with_zookeeper=True,
|
||||
)
|
||||
|
||||
logging.info("Starting cluster...")
|
||||
@ -891,3 +905,100 @@ def test_filesystem_cache(started_cluster, storage_type):
|
||||
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_replicated_database_and_unavailable_s3(started_cluster):
|
||||
node1 = started_cluster.instances["node1"]
|
||||
node2 = started_cluster.instances["node2"]
|
||||
|
||||
DB_NAME = randomize_table_name("db")
|
||||
TABLE_NAME = randomize_table_name("test_replicated_database_and_unavailable_s3")
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_restricted_bucket
|
||||
|
||||
if not minio_client.bucket_exists(bucket):
|
||||
minio_client.make_bucket(bucket)
|
||||
|
||||
node1.query(
|
||||
f"CREATE DATABASE {DB_NAME} ENGINE=Replicated('/clickhouse/databases/{DB_NAME}', 'shard1', 'node1')"
|
||||
)
|
||||
node2.query(
|
||||
f"CREATE DATABASE {DB_NAME} ENGINE=Replicated('/clickhouse/databases/{DB_NAME}', 'shard1', 'node2')"
|
||||
)
|
||||
|
||||
parquet_data_path = create_initial_data_file(
|
||||
started_cluster,
|
||||
node1,
|
||||
"SELECT number, toString(number) FROM numbers(100)",
|
||||
TABLE_NAME,
|
||||
)
|
||||
|
||||
endpoint_url = f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}"
|
||||
aws_access_key_id = "minio"
|
||||
aws_secret_access_key = "minio123"
|
||||
|
||||
schema = pa.schema(
|
||||
[
|
||||
("id", pa.int32()),
|
||||
("name", pa.string()),
|
||||
]
|
||||
)
|
||||
|
||||
data = [
|
||||
pa.array([1, 2, 3], type=pa.int32()),
|
||||
pa.array(["John Doe", "Jane Smith", "Jake Johnson"], type=pa.string()),
|
||||
]
|
||||
storage_options = {
|
||||
"AWS_ENDPOINT_URL": endpoint_url,
|
||||
"AWS_ACCESS_KEY_ID": aws_access_key_id,
|
||||
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
|
||||
"AWS_ALLOW_HTTP": "true",
|
||||
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
|
||||
}
|
||||
path = f"s3://root/{TABLE_NAME}"
|
||||
table = pa.Table.from_arrays(data, schema=schema)
|
||||
|
||||
write_deltalake(path, table, storage_options=storage_options)
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm_rule_reject = {
|
||||
"probability": 1,
|
||||
"destination": node2.ip_address,
|
||||
"source_port": started_cluster.minio_port,
|
||||
"action": "REJECT --reject-with tcp-reset",
|
||||
}
|
||||
pm_rule_drop_all = {
|
||||
"destination": node2.ip_address,
|
||||
"source_port": started_cluster.minio_port,
|
||||
"action": "DROP",
|
||||
}
|
||||
pm._add_rule(pm_rule_reject)
|
||||
|
||||
node1.query(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS {DB_NAME}.{TABLE_NAME};
|
||||
CREATE TABLE {DB_NAME}.{TABLE_NAME}
|
||||
AS deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{TABLE_NAME}' , 'minio', 'minio123')
|
||||
"""
|
||||
)
|
||||
|
||||
assert TABLE_NAME in node1.query(
|
||||
f"select name from system.tables where database = '{DB_NAME}'"
|
||||
)
|
||||
assert TABLE_NAME in node2.query(
|
||||
f"select name from system.tables where database = '{DB_NAME}'"
|
||||
)
|
||||
|
||||
replica_path = f"/clickhouse/databases/{DB_NAME}/replicas/shard1|node2"
|
||||
zk = started_cluster.get_kazoo_client("zoo1")
|
||||
zk.set(replica_path + "/digest", "123456".encode())
|
||||
|
||||
assert "123456" in node2.query(
|
||||
f"SELECT * FROM system.zookeeper WHERE path = '{replica_path}'"
|
||||
)
|
||||
|
||||
node2.restart_clickhouse()
|
||||
|
||||
assert "123456" not in node2.query(
|
||||
f"SELECT * FROM system.zookeeper WHERE path = '{replica_path}'"
|
||||
)
|
||||
|
@ -0,0 +1,347 @@
|
||||
1000
|
||||
bloom filter is off, all row groups should be read
|
||||
expect rows_read = select count()
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "AZSR",
|
||||
"flba": "WNMM"
|
||||
},
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
}
|
||||
],
|
||||
"rows": 2,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
bloom filter is on, some row groups should be skipped
|
||||
expect rows_read much less than select count()
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "AZSR",
|
||||
"flba": "WNMM"
|
||||
},
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
}
|
||||
],
|
||||
"rows": 2,
|
||||
"statistics": {
|
||||
"rows_read": 464,
|
||||
"bytes_read": 21703
|
||||
}
|
||||
}
|
||||
bloom filter is on, but where predicate contains data from 2 row groups out of 3.
|
||||
Rows read should be less than select count, but greater than previous selects
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
},
|
||||
{
|
||||
"string": "ZHZK",
|
||||
"flba": "HRWD"
|
||||
}
|
||||
],
|
||||
"rows": 2,
|
||||
"statistics": {
|
||||
"rows_read": 536,
|
||||
"bytes_read": 25708
|
||||
}
|
||||
}
|
||||
bloom filter is on, but where predicate contains data from all row groups
|
||||
expect rows_read = select count()
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
},
|
||||
{
|
||||
"string": "OKAI",
|
||||
"flba": "UXGT"
|
||||
},
|
||||
{
|
||||
"string": "ZHZK",
|
||||
"flba": "HRWD"
|
||||
}
|
||||
],
|
||||
"rows": 3,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
IN check
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
},
|
||||
{
|
||||
"string": "ZHZK",
|
||||
"flba": "HRWD"
|
||||
}
|
||||
],
|
||||
"rows": 2,
|
||||
"statistics": {
|
||||
"rows_read": 536,
|
||||
"bytes_read": 25708
|
||||
}
|
||||
}
|
||||
tuple in case, bf is off.
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
tuple in case, bf is on.
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 464,
|
||||
"bytes_read": 21703
|
||||
}
|
||||
}
|
||||
complex tuple in case, bf is off
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
complex tuple in case, bf is on
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"string": "PFJH",
|
||||
"flba": "GKJC"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 464,
|
||||
"bytes_read": 21703
|
||||
}
|
||||
}
|
||||
complex tuple in case, bf is on. Non existent
|
||||
{
|
||||
"data": [],
|
||||
"rows": 0,
|
||||
"statistics": {
|
||||
"rows_read": 0,
|
||||
"bytes_read": 0
|
||||
}
|
||||
}
|
||||
Bloom filter for json column. BF is off
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":38, \"value\":\"NXONM\"}"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
Bloom filter for json column. BF is on
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":38, \"value\":\"NXONM\"}"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 72,
|
||||
"bytes_read": 4005
|
||||
}
|
||||
}
|
||||
Bloom filter for ipv4 column. BF is off
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":38, \"value\":\"NXONM\"}"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
Bloom filter for ipv4 column. BF is on
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":38, \"value\":\"NXONM\"}"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 72,
|
||||
"bytes_read": 4005
|
||||
}
|
||||
}
|
||||
Bloom filter for ipv4 column. BF is on. Specified in the schema
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"ipv4": "0.0.1.143"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 72,
|
||||
"bytes_read": 4005
|
||||
}
|
||||
}
|
||||
Bloom filter on 64 bit column read as ipv4. We explicitly deny it, should read all rg
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"uint64_logical": "22.230.220.164"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
BF off for parquet uint64 logical type. Should read everything
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":683, \"value\":\"YKCPD\"}"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
BF on for parquet uint64 logical type. Uint64 is stored as a signed int 64, but with logical annotation. Make sure a value greater than int64 can be queried
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":683, \"value\":\"YKCPD\"}"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 464,
|
||||
"bytes_read": 21711
|
||||
}
|
||||
}
|
||||
Uint16 is stored as physical type int32 with bidwidth = 16 and sign = false. Make sure a value greater than int16 can be queried. BF is on.
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":874, \"value\":\"JENHW\"}"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 464,
|
||||
"bytes_read": 21703
|
||||
}
|
||||
}
|
||||
BF off for parquet int8 logical type. Should read everything
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":89, \"value\":\"MFIYP\"}"
|
||||
},
|
||||
{
|
||||
"json": "{\"key\":321, \"value\":\"JNOIA\"}"
|
||||
},
|
||||
{
|
||||
"json": "{\"key\":938, \"value\":\"UBMLO\"}"
|
||||
},
|
||||
{
|
||||
"json": "{\"key\":252, \"value\":\"ZVLKF\"}"
|
||||
}
|
||||
],
|
||||
"rows": 4,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
BF on for parquet int8 logical type. Should skip row groups
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"json": "{\"key\":89, \"value\":\"MFIYP\"}"
|
||||
},
|
||||
{
|
||||
"json": "{\"key\":321, \"value\":\"JNOIA\"}"
|
||||
},
|
||||
{
|
||||
"json": "{\"key\":938, \"value\":\"UBMLO\"}"
|
||||
},
|
||||
{
|
||||
"json": "{\"key\":252, \"value\":\"ZVLKF\"}"
|
||||
}
|
||||
],
|
||||
"rows": 4,
|
||||
"statistics": {
|
||||
"rows_read": 536,
|
||||
"bytes_read": 25716
|
||||
}
|
||||
}
|
||||
Invalid column conversion with in operation. String type can not be hashed against parquet int64 physical type. Should read everything
|
||||
{
|
||||
"data": [],
|
||||
"rows": 0,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
||||
Transformations on key column shall not be allowed. Should read everything
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"uint64_logical": "7711695863945021976"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 1000,
|
||||
"bytes_read": 47419
|
||||
}
|
||||
}
|
96
tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh
Executable file
96
tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh
Executable file
@ -0,0 +1,96 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-ubsan, no-fasttest
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
|
||||
USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
|
||||
|
||||
WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}"
|
||||
|
||||
mkdir -p "${WORKING_DIR}"
|
||||
|
||||
DATA_FILE="${CUR_DIR}/data_parquet/multi_column_bf.gz.parquet"
|
||||
|
||||
DATA_FILE_USER_PATH="${WORKING_DIR}/multi_column_bf.gz.parquet"
|
||||
|
||||
cp ${DATA_FILE} ${DATA_FILE_USER_PATH}
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="select count(*) from file('${DATA_FILE_USER_PATH}', Parquet) SETTINGS use_cache_for_count_from_files=false;"
|
||||
|
||||
echo "bloom filter is off, all row groups should be read"
|
||||
echo "expect rows_read = select count()"
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or flba='WNMM' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "bloom filter is on, some row groups should be skipped"
|
||||
echo "expect rows_read much less than select count()"
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or flba='WNMM' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "bloom filter is on, but where predicate contains data from 2 row groups out of 3."
|
||||
echo "Rows read should be less than select count, but greater than previous selects"
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or string='ZHZK' order by uint16_logical asc Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "bloom filter is on, but where predicate contains data from all row groups"
|
||||
echo "expect rows_read = select count()"
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or string='ZHZK' or uint64_logical=18441251162536403933 order by uint16_logical asc Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "IN check"
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string in ('PFJH', 'ZHZK') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "tuple in case, bf is off."
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in ('PFJH', 'GKJC') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "tuple in case, bf is on."
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in ('PFJH', 'GKJC') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "complex tuple in case, bf is off"
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('PFJH', 'GKJC'), ('NON2', 'NON2')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "complex tuple in case, bf is on"
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('PFJH', 'GKJC'), ('NON2', 'NON2')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "complex tuple in case, bf is on. Non existent"
|
||||
${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('NON2', 'NON2'), ('NON3', 'NON3')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Bloom filter for json column. BF is off"
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Bloom filter for json column. BF is on"
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Bloom filter for ipv4 column. BF is off"
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Bloom filter for ipv4 column. BF is on"
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Bloom filter for ipv4 column. BF is on. Specified in the schema"
|
||||
${CLICKHOUSE_CLIENT} --query="select ipv4 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv4 IPv4') where ipv4 = toIPv4('0.0.1.143') order by ipv4 asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Bloom filter on 64 bit column read as ipv4. We explicitly deny it, should read all rg"
|
||||
${CLICKHOUSE_CLIENT} --query="select uint64_logical from file ('${DATA_FILE_USER_PATH}', Parquet, 'uint64_logical IPv4') where uint64_logical = toIPv4(5552715629697883300) order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "BF off for parquet uint64 logical type. Should read everything"
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "BF on for parquet uint64 logical type. Uint64 is stored as a signed int 64, but with logical annotation. Make sure a value greater than int64 can be queried"
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Uint16 is stored as physical type int32 with bidwidth = 16 and sign = false. Make sure a value greater than int16 can be queried. BF is on."
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint16_logical=65528 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "BF off for parquet int8 logical type. Should read everything"
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "BF on for parquet int8 logical type. Should skip row groups"
|
||||
${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Invalid column conversion with in operation. String type can not be hashed against parquet int64 physical type. Should read everything"
|
||||
${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet, 'uint64_logical String') where uint64_logical in ('5') order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "Transformations on key column shall not be allowed. Should read everything"
|
||||
${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet) where negate(uint64_logical) = -7711695863945021976 order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
rm -rf ${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}/*
|
@ -0,0 +1,76 @@
|
||||
bloom filter is off, row groups should be read
|
||||
expect rows_read = select count()
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"ipv6": "7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 5,
|
||||
"bytes_read": 128
|
||||
}
|
||||
}
|
||||
bloom filter is on for ipv6, row groups should also be read since there is only one. Below queries just make sure the data is properly returned
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"ipv6": "7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 5,
|
||||
"bytes_read": 128
|
||||
}
|
||||
}
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"ipv6": "7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 5,
|
||||
"bytes_read": 128
|
||||
}
|
||||
}
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"toIPv6(ipv6)": "7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995"
|
||||
}
|
||||
],
|
||||
"rows": 1,
|
||||
"statistics": {
|
||||
"rows_read": 5,
|
||||
"bytes_read": 128
|
||||
}
|
||||
}
|
||||
non existent ipv6, row group should be skipped
|
||||
{
|
||||
"data": [],
|
||||
"rows": 0,
|
||||
"statistics": {
|
||||
"rows_read": 0,
|
||||
"bytes_read": 0
|
||||
}
|
||||
}
|
||||
{
|
||||
"data": [],
|
||||
"rows": 0,
|
||||
"statistics": {
|
||||
"rows_read": 0,
|
||||
"bytes_read": 0
|
||||
}
|
||||
}
|
||||
{
|
||||
"data": [],
|
||||
"rows": 0,
|
||||
"statistics": {
|
||||
"rows_read": 5,
|
||||
"bytes_read": 128
|
||||
}
|
||||
}
|
33
tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down_ipv6.sh
Executable file
33
tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down_ipv6.sh
Executable file
@ -0,0 +1,33 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-ubsan, no-fasttest
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
|
||||
USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
|
||||
|
||||
WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}"
|
||||
|
||||
mkdir -p "${WORKING_DIR}"
|
||||
|
||||
DATA_FILE="${CUR_DIR}/data_parquet/ipv6_bloom_filter.gz.parquet"
|
||||
|
||||
DATA_FILE_USER_PATH="${WORKING_DIR}/ipv6_bloom_filter.gz.parquet"
|
||||
|
||||
cp ${DATA_FILE} ${DATA_FILE_USER_PATH}
|
||||
|
||||
echo "bloom filter is off, row groups should be read"
|
||||
echo "expect rows_read = select count()"
|
||||
${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = '7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995' Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "bloom filter is on for ipv6, row groups should also be read since there is only one. Below queries just make sure the data is properly returned"
|
||||
${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = '7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995' Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = toIPv6('7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995') Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
${CLICKHOUSE_CLIENT} --query="select toIPv6(ipv6) from file('${DATA_FILE_USER_PATH}', Parquet) where ipv6 = toIPv6('7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995') Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
|
||||
echo "non existent ipv6, row group should be skipped"
|
||||
${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = 'fafe:b9d4:e754:4e78:8783:37f5:b2ea:9995' Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = toIPv6('fafe:b9d4:e754:4e78:8783:37f5:b2ea:9995') Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
||||
${CLICKHOUSE_CLIENT} --query="select toIPv6(ipv6) from file('${DATA_FILE_USER_PATH}', Parquet) where ipv6 = toIPv6('fafe:b9d4:e754:4e78:8783:37f5:b2ea:9995') Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)'
|
@ -0,0 +1,102 @@
|
||||
Arrow
|
||||
9260153077572524277
|
||||
ArrowStream
|
||||
9260153077572524277
|
||||
Avro
|
||||
9260153077572524277
|
||||
BSONEachRow
|
||||
9260153077572524277
|
||||
CSV
|
||||
9260153077572524277
|
||||
CSVWithNames
|
||||
9260153077572524277
|
||||
CSVWithNamesAndTypes
|
||||
9260153077572524277
|
||||
CapnProto
|
||||
9260153077572524277
|
||||
CustomSeparated
|
||||
9260153077572524277
|
||||
CustomSeparatedWithNames
|
||||
9260153077572524277
|
||||
CustomSeparatedWithNamesAndTypes
|
||||
9260153077572524277
|
||||
JSON
|
||||
9260153077572524277
|
||||
JSONColumns
|
||||
9260153077572524277
|
||||
JSONColumnsWithMetadata
|
||||
9260153077572524277
|
||||
JSONCompact
|
||||
9260153077572524277
|
||||
JSONCompactColumns
|
||||
9260153077572524277
|
||||
JSONCompactEachRow
|
||||
9260153077572524277
|
||||
JSONCompactEachRowWithNames
|
||||
9260153077572524277
|
||||
JSONCompactEachRowWithNamesAndTypes
|
||||
9260153077572524277
|
||||
JSONCompactStringsEachRow
|
||||
9260153077572524277
|
||||
JSONCompactStringsEachRowWithNames
|
||||
9260153077572524277
|
||||
JSONCompactStringsEachRowWithNamesAndTypes
|
||||
9260153077572524277
|
||||
JSONEachRow
|
||||
9260153077572524277
|
||||
JSONLines
|
||||
9260153077572524277
|
||||
JSONObjectEachRow
|
||||
9260153077572524277
|
||||
JSONStringsEachRow
|
||||
9260153077572524277
|
||||
MsgPack
|
||||
9260153077572524277
|
||||
NDJSON
|
||||
9260153077572524277
|
||||
Native
|
||||
9260153077572524277
|
||||
ORC
|
||||
9260153077572524277
|
||||
Parquet
|
||||
9260153077572524277
|
||||
Raw
|
||||
9260153077572524277
|
||||
RawWithNames
|
||||
9260153077572524277
|
||||
RawWithNamesAndTypes
|
||||
9260153077572524277
|
||||
RowBinary
|
||||
9260153077572524277
|
||||
RowBinaryWithNames
|
||||
9260153077572524277
|
||||
RowBinaryWithNamesAndTypes
|
||||
9260153077572524277
|
||||
TSKV
|
||||
9260153077572524277
|
||||
TSV
|
||||
9260153077572524277
|
||||
TSVRaw
|
||||
9260153077572524277
|
||||
TSVRawWithNames
|
||||
9260153077572524277
|
||||
TSVRawWithNamesAndTypes
|
||||
9260153077572524277
|
||||
TSVWithNames
|
||||
9260153077572524277
|
||||
TSVWithNamesAndTypes
|
||||
9260153077572524277
|
||||
TabSeparated
|
||||
9260153077572524277
|
||||
TabSeparatedRaw
|
||||
9260153077572524277
|
||||
TabSeparatedRawWithNames
|
||||
9260153077572524277
|
||||
TabSeparatedRawWithNamesAndTypes
|
||||
9260153077572524277
|
||||
TabSeparatedWithNames
|
||||
9260153077572524277
|
||||
TabSeparatedWithNamesAndTypes
|
||||
9260153077572524277
|
||||
Values
|
||||
9260153077572524277
|
35
tests/queries/0_stateless/03251_insert_sparse_all_formats.sh
Executable file
35
tests/queries/0_stateless/03251_insert_sparse_all_formats.sh
Executable file
@ -0,0 +1,35 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest, long
|
||||
|
||||
set -e
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
formats=$($CLICKHOUSE_CLIENT --query "
|
||||
SELECT name FROM system.formats
|
||||
WHERE is_input AND is_output AND name NOT IN ('Template', 'Npy', 'RawBLOB', 'ProtobufList', 'ProtobufSingle', 'Protobuf', 'LineAsString')
|
||||
ORDER BY name FORMAT TSV
|
||||
")
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
DROP TABLE IF EXISTS t_sparse_all_formats;
|
||||
CREATE TABLE t_sparse_all_formats (a UInt64, b UInt64, c String) ENGINE = MergeTree ORDER BY a;
|
||||
"
|
||||
|
||||
for format in $formats; do
|
||||
echo $format
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO t_sparse_all_formats(a) SELECT number FROM numbers(1000)"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT number AS a, 0::UInt64 AS b, '' AS c FROM numbers(1000) FORMAT $format" \
|
||||
| ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=INSERT+INTO+t_sparse_all_formats+FORMAT+$format&enable_parsing_to_custom_serialization=1" --data-binary @-
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT number AS a FROM numbers(1000) FORMAT $format" \
|
||||
| ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=INSERT+INTO+t_sparse_all_formats(a)+FORMAT+$format&enable_parsing_to_custom_serialization=1" --data-binary @-
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SELECT sum(sipHash64(*)) FROM t_sparse_all_formats;
|
||||
TRUNCATE TABLE t_sparse_all_formats;
|
||||
"
|
||||
done
|
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue
Block a user