Merge branch 'master' into remove_outdated_settings

This commit is contained in:
mergify[bot] 2021-09-01 14:07:48 +00:00 committed by GitHub
commit 5d299fbdee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
173 changed files with 2330 additions and 913 deletions

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit 7d48b2c8193679cc4516e5bd68ae4a64b94dae7d
Subproject commit 06aa8759d17f2032ffd5efa83969270ca9ac727b

View File

@ -377,6 +377,7 @@ function run_tests
# Depends on AWS
01801_s3_cluster
02012_settings_clause_for_s3
# needs psql
01889_postgresql_protocol_null_fields

View File

@ -18,9 +18,6 @@
<!-- One NUMA node w/o hyperthreading -->
<max_threads>12</max_threads>
<!-- mmap shows some improvements in perf tests -->
<min_bytes_to_use_mmap_io>64Mi</min_bytes_to_use_mmap_io>
<!-- disable jit for perf tests -->
<compile_expressions>0</compile_expressions>
<compile_aggregate_expressions>0</compile_aggregate_expressions>

View File

@ -49,6 +49,17 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENV NUM_TRIES=1
ENV MAX_RUN_TIME=0
# Download Minio-related binaries
RUN wget 'https://dl.min.io/server/minio/release/linux-amd64/minio' \
&& chmod +x ./minio \
&& wget 'https://dl.min.io/client/mc/release/linux-amd64/mc' \
&& chmod +x ./mc
ENV MINIO_ROOT_USER="clickhouse"
ENV MINIO_ROOT_PASSWORD="clickhouse"
COPY run.sh /
COPY process_functional_tests_result.py /
COPY setup_minio.sh /
CMD ["/bin/bash", "/run.sh"]

View File

@ -17,6 +17,8 @@ dpkg -i package_folder/clickhouse-test_*.deb
# install test configs
/usr/share/clickhouse-test/config/install.sh
./setup_minio.sh
# For flaky check we also enable thread fuzzer
if [ "$NUM_TRIES" -gt "1" ]; then
export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000

View File

@ -0,0 +1,57 @@
#!/bin/bash
# Usage for local run:
#
# ./docker/test/stateless/setup_minio.sh ./tests/
#
set -e -x -a -u
ls -lha
mkdir -p ./minio_data
if [ ! -f ./minio ]; then
echo 'MinIO binary not found, downloading...'
BINARY_TYPE=$(uname -s | tr '[:upper:]' '[:lower:]')
wget "https://dl.min.io/server/minio/release/${BINARY_TYPE}-amd64/minio" \
&& chmod +x ./minio \
&& wget "https://dl.min.io/client/mc/release/${BINARY_TYPE}-amd64/mc" \
&& chmod +x ./mc
fi
MINIO_ROOT_USER=${MINIO_ROOT_USER:-clickhouse}
MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD:-clickhouse}
./minio server --address ":11111" ./minio_data &
while ! curl -v --silent http://localhost:11111 2>&1 | grep AccessDenied
do
echo "Trying to connect to minio"
sleep 1
done
lsof -i :11111
sleep 5
./mc alias set clickminio http://localhost:11111 clickhouse clickhouse
./mc admin user add clickminio test testtest
./mc admin policy set clickminio readwrite user=test
./mc mb clickminio/test
# Upload data to Minio. By default after unpacking all tests will in
# /usr/share/clickhouse-test/queries
TEST_PATH=${1:-/usr/share/clickhouse-test}
MINIO_DATA_PATH=${TEST_PATH}/queries/0_stateless/data_minio
# Iterating over globs will cause redudant FILE variale to be a path to a file, not a filename
# shellcheck disable=SC2045
for FILE in $(ls "${MINIO_DATA_PATH}"); do
echo "$FILE";
./mc cp "${MINIO_DATA_PATH}"/"$FILE" clickminio/test/"$FILE";
done

View File

@ -4,6 +4,28 @@
set -x
# Thread Fuzzer allows to check more permutations of possible thread scheduling
# and find more potential issues.
export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000
export THREAD_FUZZER_SLEEP_PROBABILITY=0.1
export THREAD_FUZZER_SLEEP_TIME_US=100000
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US=10000
dpkg -i package_folder/clickhouse-common-static_*.deb
dpkg -i package_folder/clickhouse-common-static-dbg_*.deb
dpkg -i package_folder/clickhouse-server_*.deb
@ -53,7 +75,7 @@ function start()
counter=0
until clickhouse-client --query "SELECT 1"
do
if [ "$counter" -gt 120 ]
if [ "$counter" -gt 240 ]
then
echo "Cannot start clickhouse-server"
cat /var/log/clickhouse-server/stdout.log

View File

@ -0,0 +1,44 @@
# system.views {#system-views}
Contains the dependencies of all views and the type to which the view belongs. The metadata of the view comes from the [system.tables](tables.md).
Columns:
- `database` ([String](../../sql-reference/data-types/string.md)) — The name of the database the view is in.
- `name` ([String](../../sql-reference/data-types/string.md)) — Name of the view.
- `main_dependency_database` ([String](../../sql-reference/data-types/string.md)) — The name of the database on which the view depends.
- `main_dependency_table` ([String](../../sql-reference/data-types/string.md)) - The name of the table on which the view depends.
- `view_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the view. Values:
- `'Default' = 1` — [Default views](../../sql-reference/statements/create/view.md#normal). Should not appear in this log.
- `'Materialized' = 2` — [Materialized views](../../sql-reference/statements/create/view.md#materialized).
- `'Live' = 3` — [Live views](../../sql-reference/statements/create/view.md#live-view).
**Example**
```sql
SELECT * FROM system.views LIMIT 2 FORMAT Vertical;
```
```text
Row 1:
──────
database: default
name: live_view
main_dependency_database: default
main_dependency_table: view_source_tb
view_type: Live
Row 2:
──────
database: default
name: materialized_view
main_dependency_database: default
main_dependency_table: view_source_tb
view_type: Materialized
```
[Original article](https://clickhouse.tech/docs/en/operations/system-tables/views) <!--hide-->

View File

@ -44,7 +44,7 @@ void processTableFiles(const fs::path & path, const String & files_prefix, Strin
writeIntText(fs::file_size(file_path), metadata_buf);
writeChar('\n', metadata_buf);
auto src_buf = createReadBufferFromFileBase(file_path, fs::file_size(file_path), 0, 0, nullptr);
auto src_buf = createReadBufferFromFileBase(file_path, {}, fs::file_size(file_path));
auto dst_buf = create_dst_buf(remote_file_name);
copyData(*src_buf, *dst_buf);

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionIf.h>
#include "AggregateFunctionNull.h"
namespace DB
{
@ -46,6 +45,29 @@ public:
}
};
/** Given an array of flags, checks if it's all zeros
* When the buffer is all zeros, this is slightly faster than doing a memcmp since doesn't require allocating memory
* When the buffer has values, this is much faster since it avoids visiting all memory (and the allocation and function calls)
*/
static bool ALWAYS_INLINE inline is_all_zeros(const UInt8 * flags, size_t size)
{
size_t unroll_size = size - size % 8;
size_t i = 0;
while (i < unroll_size)
{
UInt64 v = *reinterpret_cast<const UInt64 *>(&flags[i]);
if (v)
return false;
i += 8;
}
for (; i < size; i++)
if (flags[i])
return false;
return true;
}
/** There are two cases: for single argument and variadic.
* Code for single argument is much more efficient.
*/
@ -112,6 +134,38 @@ public:
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const UInt8 * null_map = column->getNullMapData().data();
const IColumn * columns_param[] = {&column->getNestedColumn()};
const IColumn * filter_column = columns[num_arguments - 1];
if (const ColumnNullable * nullable_column = typeid_cast<const ColumnNullable *>(filter_column))
filter_column = nullable_column->getNestedColumnPtr().get();
if constexpr (result_is_nullable)
{
/// We need to check if there is work to do as otherwise setting the flag would be a mistake,
/// it would mean that the return value would be the default value of the nested type instead of NULL
if (is_all_zeros(assert_cast<const ColumnUInt8 *>(filter_column)->getData().data(), batch_size))
return;
}
/// Combine the 2 flag arrays so we can call a simplified version (one check vs 2)
/// Note that now the null map will contain 0 if not null and not filtered, or 1 for null or filtered (or both)
const auto * filter_flags = assert_cast<const ColumnUInt8 *>(filter_column)->getData().data();
auto final_nulls = std::make_unique<UInt8[]>(batch_size);
for (size_t i = 0; i < batch_size; ++i)
final_nulls[i] = (!!null_map[i]) | (!filter_flags[i]);
this->nested_function->addBatchSinglePlaceNotNull(
batch_size, this->nestedPlace(place), columns_param, final_nulls.get(), arena, -1);
if constexpr (result_is_nullable)
if (!memoryIsByte(null_map, batch_size, 1))
this->setFlag(place);
}
#if USE_EMBEDDED_COMPILER
void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const DataTypes & arguments_types, const std::vector<llvm::Value *> & argument_values) const override

View File

@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <experimental/type_traits>
#include <type_traits>
@ -96,8 +97,9 @@ struct AggregateFunctionSumData
Impl::add(sum, local_sum);
}
template <typename Value>
void NO_SANITIZE_UNDEFINED NO_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
template <typename Value, bool add_if_zero>
void NO_SANITIZE_UNDEFINED NO_INLINE
addManyConditional_internal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t count)
{
const auto * end = ptr + count;
@ -110,10 +112,10 @@ struct AggregateFunctionSumData
T local_sum{};
while (ptr < end)
{
T multiplier = !*null_map;
T multiplier = !*condition_map == add_if_zero;
Impl::add(local_sum, *ptr * multiplier);
++ptr;
++null_map;
++condition_map;
}
Impl::add(sum, local_sum);
return;
@ -130,13 +132,13 @@ struct AggregateFunctionSumData
{
for (size_t i = 0; i < unroll_count; ++i)
{
if (!null_map[i])
if (!condition_map[i] == add_if_zero)
{
Impl::add(partial_sums[i], ptr[i]);
}
}
ptr += unroll_count;
null_map += unroll_count;
condition_map += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
@ -146,14 +148,26 @@ struct AggregateFunctionSumData
T local_sum{};
while (ptr < end)
{
if (!*null_map)
if (!*condition_map == add_if_zero)
Impl::add(local_sum, *ptr);
++ptr;
++null_map;
++condition_map;
}
Impl::add(sum, local_sum);
}
template <typename Value>
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
{
return addManyConditional_internal<Value, true>(ptr, null_map, count);
}
template <typename Value>
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t count)
{
return addManyConditional_internal<Value, false>(ptr, cond_map, count);
}
void NO_SANITIZE_UNDEFINED merge(const AggregateFunctionSumData & rhs)
{
Impl::add(sum, rhs.sum);
@ -229,8 +243,8 @@ struct AggregateFunctionSumKahanData
}
}
template <typename Value>
void NO_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
template <typename Value, bool add_if_zero>
void NO_INLINE addManyConditional_internal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t count)
{
constexpr size_t unroll_count = 4;
T partial_sums[unroll_count]{};
@ -242,10 +256,10 @@ struct AggregateFunctionSumKahanData
while (ptr < unrolled_end)
{
for (size_t i = 0; i < unroll_count; ++i)
if (!null_map[i])
if ((!condition_map[i]) == add_if_zero)
addImpl(ptr[i], partial_sums[i], partial_compensations[i]);
ptr += unroll_count;
null_map += unroll_count;
condition_map += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
@ -253,13 +267,25 @@ struct AggregateFunctionSumKahanData
while (ptr < end)
{
if (!*null_map)
if ((!*condition_map) == add_if_zero)
addImpl(*ptr, sum, compensation);
++ptr;
++null_map;
++condition_map;
}
}
template <typename Value>
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
{
return addManyConditional_internal<Value, true>(ptr, null_map, count);
}
template <typename Value>
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t count)
{
return addManyConditional_internal<Value, false>(ptr, cond_map, count);
}
void ALWAYS_INLINE mergeImpl(T & to_sum, T & to_compensation, T from_sum, T from_compensation)
{
auto raw_sum = to_sum + from_sum;
@ -352,40 +378,38 @@ public:
this->data(place).add(column.getData()[row_num]);
}
/// Vectorized version when there is no GROUP BY keys.
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) const override
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const override
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
add(place, columns, i, arena);
}
this->data(place).addManyConditional(column.getData().data(), flags.data(), batch_size);
}
else
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
this->data(place).addMany(column.getData().data(), batch_size);
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena, ssize_t if_argument_pos)
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena *, ssize_t if_argument_pos)
const override
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
/// Merge the 2 sets of flags (null and if) into a single one. This allows us to use parallelizable sums when available
const auto * if_flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
auto final_flags = std::make_unique<UInt8[]>(batch_size);
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i] && flags[i])
add(place, columns, i, arena);
final_flags[i] = (!null_map[i]) & if_flags[i];
this->data(place).addManyConditional(column.getData().data(), final_flags.get(), batch_size);
}
else
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
}
}

View File

@ -459,8 +459,6 @@ public:
explicit FieldVisitorMax(const Field & rhs_) : rhs(rhs_) {}
bool operator() (Null &) const { throw Exception("Cannot compare Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (NegativeInfinity &) const { throw Exception("Cannot compare -Inf", ErrorCodes::LOGICAL_ERROR); }
bool operator() (PositiveInfinity &) const { throw Exception("Cannot compare +Inf", ErrorCodes::LOGICAL_ERROR); }
bool operator() (AggregateFunctionStateData &) const { throw Exception("Cannot compare AggregateFunctionStates", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array & x) const { return compareImpl<Array>(x); }
@ -496,8 +494,6 @@ public:
explicit FieldVisitorMin(const Field & rhs_) : rhs(rhs_) {}
bool operator() (Null &) const { throw Exception("Cannot compare Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (NegativeInfinity &) const { throw Exception("Cannot compare -Inf", ErrorCodes::LOGICAL_ERROR); }
bool operator() (PositiveInfinity &) const { throw Exception("Cannot compare +Inf", ErrorCodes::LOGICAL_ERROR); }
bool operator() (AggregateFunctionStateData &) const { throw Exception("Cannot sum AggregateFunctionStates", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array & x) const { return compareImpl<Array>(x); }

View File

@ -190,6 +190,7 @@ public:
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const = 0;
/** The same for single place when need to aggregate only filtered data.
* Instead of using an if-column, the condition is combined inside the null_map
*/
virtual void addBatchSinglePlaceNotNull(
size_t batch_size,

View File

@ -41,7 +41,7 @@ std::unique_ptr<ReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
if (disk)
return disk->readFile(file_path);
else
return createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr);
return createReadBufferFromFileBase(file_path, {}, 0);
}
}

View File

@ -10,7 +10,7 @@ namespace
{
String readFile(const String & file_path)
{
auto buf = createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr);
auto buf = createReadBufferFromFileBase(file_path, {}, 0);
String s;
readStringUntilEOF(s, *buf);
return s;

View File

@ -8,8 +8,7 @@
#include <Client/HedgedConnectionsFactory.h>
#include <Client/IConnections.h>
#include <Client/PacketReceiver.h>
#include <Common/FiberStack.h>
#include <Common/Fiber.h>
namespace DB
{

View File

@ -577,15 +577,15 @@ void getExtremesWithNulls(const IColumn & nested_column, const NullMap & null_ar
}
else if (number_of_nulls == n)
{
min = PositiveInfinity();
max = PositiveInfinity();
min = POSITIVE_INFINITY;
max = POSITIVE_INFINITY;
}
else
{
auto filtered_column = nested_column.filter(not_null_array, -1);
filtered_column->getExtremes(min, max);
if (null_last)
max = PositiveInfinity();
max = POSITIVE_INFINITY;
}
}
}

View File

@ -85,7 +85,7 @@ size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * nu
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos)
for (; pos < end; ++pos, ++pos2)
count += (*pos & ~*pos2) != 0;
return count;

View File

@ -76,6 +76,7 @@
M(ActiveAsyncDrainedConnections, "Number of active connections drained asynchronously.") \
M(SyncDrainedConnections, "Number of connections drained synchronously.") \
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
namespace CurrentMetrics
{

View File

@ -583,6 +583,8 @@
M(612, OBJECT_ALREADY_STORED_ON_DISK) \
M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \
M(614, POSTGRESQL_CONNECTION_FAILURE) \
M(615, CANNOT_ADVISE) \
M(616, UNKNOWN_READ_METHOD) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -1,4 +1,6 @@
#pragma once
/// defines.h should be included before fiber.hpp
/// BOOST_USE_ASAN, BOOST_USE_TSAN and BOOST_USE_UCONTEXT should be correctly defined for sanitizers.
#include <common/defines.h>
#include <boost/context/fiber.hpp>

View File

@ -26,16 +26,6 @@ public:
throw Exception("Cannot convert NULL to " + demangle(typeid(T).name()), ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const NegativeInfinity &) const
{
throw Exception("Cannot convert -Inf to " + demangle(typeid(T).name()), ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const PositiveInfinity &) const
{
throw Exception("Cannot convert +Inf to " + demangle(typeid(T).name()), ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const String &) const
{
throw Exception("Cannot convert String to " + demangle(typeid(T).name()), ErrorCodes::CANNOT_CONVERT_TYPE);

View File

@ -24,9 +24,7 @@ static inline void writeQuoted(const DecimalField<T> & x, WriteBuffer & buf)
writeChar('\'', buf);
}
String FieldVisitorDump::operator() (const Null &) const { return "NULL"; }
String FieldVisitorDump::operator() (const NegativeInfinity &) const { return "-Inf"; }
String FieldVisitorDump::operator() (const PositiveInfinity &) const { return "+Inf"; }
String FieldVisitorDump::operator() (const Null & x) const { return x.isNegativeInfinity() ? "-Inf" : (x.isPositiveInfinity() ? "+Inf" : "NULL"); }
String FieldVisitorDump::operator() (const UInt64 & x) const { return formatQuotedWithPrefix(x, "UInt64_"); }
String FieldVisitorDump::operator() (const Int64 & x) const { return formatQuotedWithPrefix(x, "Int64_"); }
String FieldVisitorDump::operator() (const Float64 & x) const { return formatQuotedWithPrefix(x, "Float64_"); }

View File

@ -10,8 +10,6 @@ class FieldVisitorDump : public StaticVisitor<String>
{
public:
String operator() (const Null & x) const;
String operator() (const NegativeInfinity & x) const;
String operator() (const PositiveInfinity & x) const;
String operator() (const UInt64 & x) const;
String operator() (const UInt128 & x) const;
String operator() (const UInt256 & x) const;

View File

@ -14,18 +14,6 @@ void FieldVisitorHash::operator() (const Null &) const
hash.update(type);
}
void FieldVisitorHash::operator() (const NegativeInfinity &) const
{
UInt8 type = Field::Types::NegativeInfinity;
hash.update(type);
}
void FieldVisitorHash::operator() (const PositiveInfinity &) const
{
UInt8 type = Field::Types::PositiveInfinity;
hash.update(type);
}
void FieldVisitorHash::operator() (const UInt64 & x) const
{
UInt8 type = Field::Types::UInt64;

View File

@ -16,8 +16,6 @@ public:
FieldVisitorHash(SipHash & hash_);
void operator() (const Null & x) const;
void operator() (const NegativeInfinity & x) const;
void operator() (const PositiveInfinity & x) const;
void operator() (const UInt64 & x) const;
void operator() (const UInt128 & x) const;
void operator() (const UInt256 & x) const;

View File

@ -22,8 +22,6 @@ bool FieldVisitorSum::operator() (UInt64 & x) const
bool FieldVisitorSum::operator() (Float64 & x) const { x += get<Float64>(rhs); return x != 0; }
bool FieldVisitorSum::operator() (Null &) const { throw Exception("Cannot sum Nulls", ErrorCodes::LOGICAL_ERROR); }
bool FieldVisitorSum::operator() (NegativeInfinity &) const { throw Exception("Cannot sum -Inf", ErrorCodes::LOGICAL_ERROR); }
bool FieldVisitorSum::operator() (PositiveInfinity &) const { throw Exception("Cannot sum +Inf", ErrorCodes::LOGICAL_ERROR); }
bool FieldVisitorSum::operator() (String &) const { throw Exception("Cannot sum Strings", ErrorCodes::LOGICAL_ERROR); }
bool FieldVisitorSum::operator() (Array &) const { throw Exception("Cannot sum Arrays", ErrorCodes::LOGICAL_ERROR); }
bool FieldVisitorSum::operator() (Tuple &) const { throw Exception("Cannot sum Tuples", ErrorCodes::LOGICAL_ERROR); }

View File

@ -21,8 +21,6 @@ public:
bool operator() (UInt64 & x) const;
bool operator() (Float64 & x) const;
bool operator() (Null &) const;
bool operator() (NegativeInfinity & x) const;
bool operator() (PositiveInfinity & x) const;
bool operator() (String &) const;
bool operator() (Array &) const;
bool operator() (Tuple &) const;

View File

@ -52,9 +52,7 @@ static String formatFloat(const Float64 x)
}
String FieldVisitorToString::operator() (const Null &) const { return "NULL"; }
String FieldVisitorToString::operator() (const NegativeInfinity &) const { return "-Inf"; }
String FieldVisitorToString::operator() (const PositiveInfinity &) const { return "+Inf"; }
String FieldVisitorToString::operator() (const Null & x) const { return x.isNegativeInfinity() ? "-Inf" : (x.isPositiveInfinity() ? "+Inf" : "NULL"); }
String FieldVisitorToString::operator() (const UInt64 & x) const { return formatQuoted(x); }
String FieldVisitorToString::operator() (const Int64 & x) const { return formatQuoted(x); }
String FieldVisitorToString::operator() (const Float64 & x) const { return formatFloat(x); }

View File

@ -10,8 +10,6 @@ class FieldVisitorToString : public StaticVisitor<String>
{
public:
String operator() (const Null & x) const;
String operator() (const NegativeInfinity & x) const;
String operator() (const PositiveInfinity & x) const;
String operator() (const UInt64 & x) const;
String operator() (const UInt128 & x) const;
String operator() (const UInt256 & x) const;

View File

@ -6,9 +6,7 @@
namespace DB
{
void FieldVisitorWriteBinary::operator() (const Null &, WriteBuffer &) const { }
void FieldVisitorWriteBinary::operator() (const NegativeInfinity &, WriteBuffer &) const { }
void FieldVisitorWriteBinary::operator() (const PositiveInfinity &, WriteBuffer &) const { }
void FieldVisitorWriteBinary::operator() (const Null &, WriteBuffer &) const {}
void FieldVisitorWriteBinary::operator() (const UInt64 & x, WriteBuffer & buf) const { writeVarUInt(x, buf); }
void FieldVisitorWriteBinary::operator() (const Int64 & x, WriteBuffer & buf) const { writeVarInt(x, buf); }
void FieldVisitorWriteBinary::operator() (const Float64 & x, WriteBuffer & buf) const { writeFloatBinary(x, buf); }

View File

@ -9,8 +9,6 @@ class FieldVisitorWriteBinary
{
public:
void operator() (const Null & x, WriteBuffer & buf) const;
void operator() (const NegativeInfinity & x, WriteBuffer & buf) const;
void operator() (const PositiveInfinity & x, WriteBuffer & buf) const;
void operator() (const UInt64 & x, WriteBuffer & buf) const;
void operator() (const UInt128 & x, WriteBuffer & buf) const;
void operator() (const UInt256 & x, WriteBuffer & buf) const;

View File

@ -26,11 +26,11 @@ public:
template <typename T, typename U>
bool operator() (const T & l, const U & r) const
{
if constexpr (std::is_same_v<T, Null> || std::is_same_v<U, Null>
|| std::is_same_v<T, NegativeInfinity> || std::is_same_v<T, PositiveInfinity>
|| std::is_same_v<U, NegativeInfinity> || std::is_same_v<U, PositiveInfinity>)
if constexpr (std::is_same_v<T, Null> || std::is_same_v<U, Null>)
{
return std::is_same_v<T, U>;
if constexpr (std::is_same_v<T, Null> && std::is_same_v<U, Null>)
return l == r;
return false;
}
else
{
@ -79,12 +79,18 @@ public:
template <typename T, typename U>
bool operator() (const T & l, const U & r) const
{
if constexpr (std::is_same_v<T, Null> || std::is_same_v<U, Null>)
return false;
else if constexpr (std::is_same_v<T, NegativeInfinity> || std::is_same_v<U, PositiveInfinity>)
return !std::is_same_v<T, U>;
else if constexpr (std::is_same_v<U, NegativeInfinity> || std::is_same_v<T, PositiveInfinity>)
return false;
if constexpr (std::is_same_v<T, Null> && std::is_same_v<U, Null>)
{
return l.isNegativeInfinity() && r.isPositiveInfinity();
}
else if constexpr (std::is_same_v<T, Null>)
{
return l.isNegativeInfinity();
}
else if constexpr (std::is_same_v<U, Null>)
{
return r.isPositiveInfinity();
}
else
{
if constexpr (std::is_same_v<T, U>)

View File

@ -251,6 +251,15 @@
\
M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \
M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \
\
M(ThreadPoolReaderPageCacheHit, "Number of times the read inside ThreadPoolReader was done from page cache.") \
M(ThreadPoolReaderPageCacheHitBytes, "Number of bytes read inside ThreadPoolReader when it was done from page cache.") \
M(ThreadPoolReaderPageCacheHitElapsedMicroseconds, "Time spent reading data from page cache in ThreadPoolReader.") \
M(ThreadPoolReaderPageCacheMiss, "Number of times the read inside ThreadPoolReader was not done from page cache and was hand off to thread pool.") \
M(ThreadPoolReaderPageCacheMissBytes, "Number of bytes read inside ThreadPoolReader when read was not done from page cache and was hand off to thread pool.") \
M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \
\
M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \
namespace ProfileEvents

View File

@ -173,7 +173,7 @@ void QueryProfilerBase<ProfilerImpl>::tryCleanup()
}
template class QueryProfilerBase<QueryProfilerReal>;
template class QueryProfilerBase<QueryProfilerCpu>;
template class QueryProfilerBase<QueryProfilerCPU>;
QueryProfilerReal::QueryProfilerReal(const UInt64 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_MONOTONIC, period, SIGUSR1)
@ -185,11 +185,11 @@ void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
writeTraceInfo(TraceType::Real, sig, info, context);
}
QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
QueryProfilerCPU::QueryProfilerCPU(const UInt64 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_THREAD_CPUTIME_ID, period, SIGUSR2)
{}
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
void QueryProfilerCPU::signalHandler(int sig, siginfo_t * info, void * context)
{
DENY_ALLOCATIONS_IN_SCOPE;
writeTraceInfo(TraceType::CPU, sig, info, context);

View File

@ -26,7 +26,7 @@ namespace DB
* 3. write collected stack trace to trace_pipe for TraceCollector
*
* Destructor tries to unset timer and restore previous signal handler.
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCpu.
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCPU.
*/
template <typename ProfilerImpl>
class QueryProfilerBase
@ -62,10 +62,10 @@ public:
};
/// Query profiler with timer based on CPU clock
class QueryProfilerCpu : public QueryProfilerBase<QueryProfilerCpu>
class QueryProfilerCPU : public QueryProfilerBase<QueryProfilerCPU>
{
public:
QueryProfilerCpu(const UInt64 thread_id, const UInt32 period);
QueryProfilerCPU(const UInt64 thread_id, const UInt32 period);
static void signalHandler(int sig, siginfo_t * info, void * context);
};

View File

@ -158,7 +158,7 @@ std::pair<bool, std::string> StudentTTest::compareAndReport(size_t confidence_le
if (mean_difference > mean_confidence_interval && (mean_difference - mean_confidence_interval > 0.0001)) /// difference must be more than 0.0001, to take into account connection latency.
{
ss << "Difference at " << confidence_level[confidence_level_index] << "% confidence : ";
ss << "Difference at " << confidence_level[confidence_level_index] << "% confidence: ";
ss << std::fixed << std::setprecision(8) << "mean difference is " << mean_difference << ", but confidence interval is " << mean_confidence_interval;
return {false, ss.str()};
}

View File

@ -29,7 +29,7 @@ namespace DB
class QueryStatus;
class ThreadStatus;
class QueryProfilerReal;
class QueryProfilerCpu;
class QueryProfilerCPU;
class QueryThreadLog;
struct OpenTelemetrySpanHolder;
class TasksStatsCounters;
@ -140,7 +140,7 @@ protected:
// CPU and Real time query profilers
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
std::unique_ptr<QueryProfilerCPU> query_profiler_cpu;
Poco::Logger * log = nullptr;

View File

@ -130,8 +130,27 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
throw DB::Exception("Unknown implementation of coordination service: " + implementation, DB::ErrorCodes::NOT_IMPLEMENTED);
}
if (!chroot.empty() && !exists("/"))
throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::Error::ZNONODE);
if (!chroot.empty())
{
/// Here we check that zk root exists.
/// This check is clumsy. The reason is we do this request under common mutex, and never want to hung here.
/// Otherwise, all threads which need zk will wait for this mutex eternally.
///
/// Usually, this was possible in case of memory limit exception happened inside zk implementation.
/// This should not happen now, when memory tracker is disabled.
/// But let's keep it just in case (it is also easy to backport).
auto future = asyncExists("/");
auto res = future.wait_for(std::chrono::milliseconds(operation_timeout_ms));
if (res != std::future_status::ready)
throw KeeperException("Cannot check if zookeeper root exists.", Coordination::Error::ZOPERATIONTIMEOUT);
auto code = future.get().error;
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, "/");
if (code == Coordination::Error::ZNONODE)
throw KeeperException("ZooKeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::Error::ZNONODE);
}
}
ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & identity_, int32_t session_timeout_ms_,

View File

@ -32,7 +32,8 @@ protected:
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
///
/// If always_copy is true then even if the compressed block is already stored in compressed_in.buffer() it will be copied into own_compressed_buffer.
/// If always_copy is true then even if the compressed block is already stored in compressed_in.buffer()
/// it will be copied into own_compressed_buffer.
/// This is required for CheckingCompressedReadBuffer, since this is just a proxy.
///
/// Returns number of compressed bytes read.

View File

@ -36,6 +36,13 @@ bool CompressedReadBufferFromFile::nextImpl()
return true;
}
void CompressedReadBufferFromFile::prefetch()
{
file_in.prefetch();
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{
@ -46,14 +53,11 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadB
CompressedReadBufferFromFile::CompressedReadBufferFromFile(
const std::string & path,
const ReadSettings & settings,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache,
size_t buf_size,
bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0)
, p_file_in(createReadBufferFromFileBase(path, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache, buf_size))
, p_file_in(createReadBufferFromFileBase(path, settings, estimated_size))
, file_in(*p_file_in)
{
compressed_in = &file_in;

View File

@ -1,7 +1,8 @@
#pragma once
#include "CompressedReadBufferBase.h"
#include <Compression/CompressedReadBufferBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <time.h>
#include <memory>
@ -28,13 +29,13 @@ private:
size_t size_compressed = 0;
bool nextImpl() override;
void prefetch() override;
public:
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);
CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, bool allow_different_codecs_ = false);
const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -37,7 +37,7 @@ int main(int argc, char ** argv)
path,
[&]()
{
return createReadBufferFromFileBase(path, 0, 0, 0, nullptr);
return createReadBufferFromFileBase(path, {}, 0);
},
&cache
);
@ -56,7 +56,7 @@ int main(int argc, char ** argv)
path,
[&]()
{
return createReadBufferFromFileBase(path, 0, 0, 0, nullptr);
return createReadBufferFromFileBase(path, {}, 0);
},
&cache
);

View File

@ -484,19 +484,14 @@ template bool decimalLessOrEqual<Decimal256>(Decimal256 x, Decimal256 y, UInt32
template bool decimalLessOrEqual<DateTime64>(DateTime64 x, DateTime64 y, UInt32 x_scale, UInt32 y_scale);
inline void writeText(const Null &, WriteBuffer & buf)
inline void writeText(const Null & x, WriteBuffer & buf)
{
writeText(std::string("NULL"), buf);
}
inline void writeText(const NegativeInfinity &, WriteBuffer & buf)
{
writeText(std::string("-Inf"), buf);
}
inline void writeText(const PositiveInfinity &, WriteBuffer & buf)
{
writeText(std::string("+Inf"), buf);
if (x.isNegativeInfinity())
writeText(std::string("-Inf"), buf);
if (x.isPositiveInfinity())
writeText(std::string("+Inf"), buf);
else
writeText(std::string("NULL"), buf);
}
String toString(const Field & x)

View File

@ -28,6 +28,9 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
constexpr Null NEGATIVE_INFINITY{Null::Value::NegativeInfinity};
constexpr Null POSITIVE_INFINITY{Null::Value::PositiveInfinity};
class Field;
using FieldVector = std::vector<Field, AllocatorWithMemoryTracking<Field>>;
@ -218,8 +221,6 @@ template <> struct NearestFieldTypeImpl<Tuple> { using Type = Tuple; };
template <> struct NearestFieldTypeImpl<Map> { using Type = Map; };
template <> struct NearestFieldTypeImpl<bool> { using Type = UInt64; };
template <> struct NearestFieldTypeImpl<Null> { using Type = Null; };
template <> struct NearestFieldTypeImpl<NegativeInfinity> { using Type = NegativeInfinity; };
template <> struct NearestFieldTypeImpl<PositiveInfinity> { using Type = PositiveInfinity; };
template <> struct NearestFieldTypeImpl<AggregateFunctionStateData> { using Type = AggregateFunctionStateData; };
@ -281,10 +282,6 @@ public:
Int256 = 25,
Map = 26,
UUID = 27,
// Special types for index analysis
NegativeInfinity = 254,
PositiveInfinity = 255,
};
static const char * toString(Which which)
@ -292,8 +289,6 @@ public:
switch (which)
{
case Null: return "Null";
case NegativeInfinity: return "-Inf";
case PositiveInfinity: return "+Inf";
case UInt64: return "UInt64";
case UInt128: return "UInt128";
case UInt256: return "UInt256";
@ -337,10 +332,7 @@ public:
!std::is_same_v<std::decay_t<T>, bool> &&
!std::is_same_v<NearestFieldType<std::decay_t<T>>, String>, Z>;
Field() //-V730
: which(Types::Null)
{
}
Field() : Field(Null{}) {}
/** Despite the presence of a template constructor, this constructor is still needed,
* since, in its absence, the compiler will still generate the default constructor.
@ -427,12 +419,7 @@ public:
Types::Which getType() const { return which; }
const char * getTypeName() const { return Types::toString(which); }
// Non-valued field are all denoted as Null
bool isNull() const { return which == Types::Null || which == Types::NegativeInfinity || which == Types::PositiveInfinity; }
bool isNegativeInfinity() const { return which == Types::NegativeInfinity; }
bool isPositiveInfinity() const { return which == Types::PositiveInfinity; }
bool isNull() const { return which == Types::Null; }
template <typename T>
NearestFieldType<std::decay_t<T>> & get();
@ -443,6 +430,9 @@ public:
return mutable_this->get<T>();
}
bool isNegativeInfinity() const { return which == Types::Null && get<Null>().isNegativeInfinity(); }
bool isPositiveInfinity() const { return which == Types::Null && get<Null>().isPositiveInfinity(); }
template <typename T>
T & reinterpret();
@ -485,10 +475,7 @@ public:
switch (which)
{
case Types::Null:
case Types::NegativeInfinity:
case Types::PositiveInfinity:
return false;
case Types::Null: return false;
case Types::UInt64: return get<UInt64>() < rhs.get<UInt64>();
case Types::UInt128: return get<UInt128>() < rhs.get<UInt128>();
case Types::UInt256: return get<UInt256>() < rhs.get<UInt256>();
@ -525,10 +512,7 @@ public:
switch (which)
{
case Types::Null:
case Types::NegativeInfinity:
case Types::PositiveInfinity:
return true;
case Types::Null: return true;
case Types::UInt64: return get<UInt64>() <= rhs.get<UInt64>();
case Types::UInt128: return get<UInt128>() <= rhs.get<UInt128>();
case Types::UInt256: return get<UInt256>() <= rhs.get<UInt256>();
@ -565,10 +549,7 @@ public:
switch (which)
{
case Types::Null:
case Types::NegativeInfinity:
case Types::PositiveInfinity:
return true;
case Types::Null: return true;
case Types::UInt64: return get<UInt64>() == rhs.get<UInt64>();
case Types::Int64: return get<Int64>() == rhs.get<Int64>();
case Types::Float64:
@ -608,8 +589,6 @@ public:
switch (field.which)
{
case Types::Null: return f(field.template get<Null>());
case Types::NegativeInfinity: return f(field.template get<NegativeInfinity>());
case Types::PositiveInfinity: return f(field.template get<PositiveInfinity>());
// gcc 8.2.1
#if !defined(__clang__)
#pragma GCC diagnostic push
@ -767,9 +746,7 @@ private:
using Row = std::vector<Field>;
template <> struct Field::TypeToEnum<Null> { static const Types::Which value = Types::Null; };
template <> struct Field::TypeToEnum<NegativeInfinity> { static const Types::Which value = Types::NegativeInfinity; };
template <> struct Field::TypeToEnum<PositiveInfinity> { static const Types::Which value = Types::PositiveInfinity; };
template <> struct Field::TypeToEnum<Null> { static const Types::Which value = Types::Null; };
template <> struct Field::TypeToEnum<UInt64> { static const Types::Which value = Types::UInt64; };
template <> struct Field::TypeToEnum<UInt128> { static const Types::Which value = Types::UInt128; };
template <> struct Field::TypeToEnum<UInt256> { static const Types::Which value = Types::UInt256; };
@ -790,8 +767,6 @@ template <> struct Field::TypeToEnum<DecimalField<DateTime64>>{ static const Typ
template <> struct Field::TypeToEnum<AggregateFunctionStateData>{ static const Types::Which value = Types::AggregateFunctionState; };
template <> struct Field::EnumToType<Field::Types::Null> { using Type = Null; };
template <> struct Field::EnumToType<Field::Types::NegativeInfinity> { using Type = NegativeInfinity; };
template <> struct Field::EnumToType<Field::Types::PositiveInfinity> { using Type = PositiveInfinity; };
template <> struct Field::EnumToType<Field::Types::UInt64> { using Type = UInt64; };
template <> struct Field::EnumToType<Field::Types::UInt128> { using Type = UInt128; };
template <> struct Field::EnumToType<Field::Types::UInt256> { using Type = UInt256; };

View File

@ -3,6 +3,7 @@
#include <Core/BaseSettings.h>
#include <Core/SettingsEnums.h>
#include <Core/Defines.h>
#include <IO/ReadSettings.h>
namespace Poco::Util
@ -177,6 +178,7 @@ class IColumn;
M(LogQueriesType, log_queries_min_type, QueryLogElementType::QUERY_START, "Minimal type in query_log to log, possible values (from low to high): QUERY_START, QUERY_FINISH, EXCEPTION_BEFORE_START, EXCEPTION_WHILE_PROCESSING.", 0) \
M(Milliseconds, log_queries_min_query_duration_ms, 0, "Minimal time for the query to run, to get to the query_log/query_thread_log/query_views_log.", 0) \
M(UInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.", 0) \
M(Float, log_queries_probability, 1., "Log queries with the specified probabality.", 0) \
\
M(DistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?", IMPORTANT) \
\
@ -499,6 +501,11 @@ class IColumn;
M(UInt64, function_range_max_elements_in_block, 500000000, "Maximum number of values generated by function 'range' per block of data (sum of array sizes for every row in a block, see also 'max_block_size' and 'min_insert_block_size_rows'). It is a safety threshold.", 0) \
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable', 'disable', 'force_enable'", 0) \
\
M(String, local_filesystem_read_method, "pread", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \

View File

@ -13,9 +13,33 @@ namespace DB
/// Data types for representing elementary values from a database in RAM.
struct Null {};
struct NegativeInfinity {};
struct PositiveInfinity {};
/// Hold a null value for untyped calculation. It can also store infinities to handle nullable
/// comparison which is used for nullable KeyCondition.
struct Null
{
enum class Value
{
Null,
PositiveInfinity,
NegativeInfinity,
};
Value value{Value::Null};
bool isNull() const { return value == Value::Null; }
bool isPositiveInfinity() const { return value == Value::PositiveInfinity; }
bool isNegativeInfinity() const { return value == Value::NegativeInfinity; }
bool operator==(const Null & other) const
{
return value == other.value;
}
bool operator!=(const Null & other) const
{
return !(*this == other);
}
};
/// Ignore strange gcc warning https://gcc.gnu.org/bugzilla/show_bug.cgi?id=55776
#if !defined(__clang__)

View File

@ -13,15 +13,15 @@
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Common/FiberStack.h>
#include <Client/MultiplexedConnections.h>
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace CurrentMetrics
{
extern const Metric SyncDrainedConnections;
extern const Metric ActiveSyncDrainedConnections;
extern const Metric SyncDrainedConnections;
extern const Metric ActiveSyncDrainedConnections;
}
namespace DB

View File

@ -6,10 +6,10 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context.h>
#include <Interpreters/StorageID.h>
#include <Common/FiberStack.h>
#include <Common/TimerDescriptor.h>
#include <variant>
namespace DB
{

View File

@ -19,7 +19,6 @@ namespace DB
namespace ErrorCodes
{
extern const int EMPTY_DATA_PASSED;
extern const int LOGICAL_ERROR;
}
@ -28,16 +27,6 @@ DataTypePtr FieldToDataType::operator() (const Null &) const
return std::make_shared<DataTypeNullable>(std::make_shared<DataTypeNothing>());
}
DataTypePtr FieldToDataType::operator() (const NegativeInfinity &) const
{
throw Exception("It's invalid to have -inf literals in SQL", ErrorCodes::LOGICAL_ERROR);
}
DataTypePtr FieldToDataType::operator() (const PositiveInfinity &) const
{
throw Exception("It's invalid to have +inf literals in SQL", ErrorCodes::LOGICAL_ERROR);
}
DataTypePtr FieldToDataType::operator() (const UInt64 & x) const
{
if (x <= std::numeric_limits<UInt8>::max()) return std::make_shared<DataTypeUInt8>();

View File

@ -21,8 +21,6 @@ class FieldToDataType : public StaticVisitor<DataTypePtr>
{
public:
DataTypePtr operator() (const Null & x) const;
DataTypePtr operator() (const NegativeInfinity & x) const;
DataTypePtr operator() (const PositiveInfinity & x) const;
DataTypePtr operator() (const UInt64 & x) const;
DataTypePtr operator() (const UInt128 & x) const;
DataTypePtr operator() (const UInt256 & x) const;

View File

@ -88,19 +88,16 @@ std::shared_ptr<FileDownloadMetadata> DiskCacheWrapper::acquireDownloadMetadata(
std::unique_ptr<ReadBufferFromFileBase>
DiskCacheWrapper::readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
const ReadSettings & settings,
size_t estimated_size) const
{
if (!cache_file_predicate(path))
return DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return DiskDecorator::readFile(path, settings, estimated_size);
LOG_DEBUG(log, "Read file {} from cache", backQuote(path));
if (cache_disk->exists(path))
return cache_disk->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return cache_disk->readFile(path, settings, estimated_size);
auto metadata = acquireDownloadMetadata(path);
@ -134,8 +131,8 @@ DiskCacheWrapper::readFile(
auto tmp_path = path + ".tmp";
{
auto src_buffer = DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
auto dst_buffer = cache_disk->writeFile(tmp_path, buf_size, WriteMode::Rewrite);
auto src_buffer = DiskDecorator::readFile(path, settings, estimated_size);
auto dst_buffer = cache_disk->writeFile(tmp_path, settings.local_fs_buffer_size, WriteMode::Rewrite);
copyData(*src_buffer, *dst_buffer);
}
cache_disk->moveFile(tmp_path, path);
@ -158,9 +155,9 @@ DiskCacheWrapper::readFile(
}
if (metadata->status == DOWNLOADED)
return cache_disk->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return cache_disk->readFile(path, settings, estimated_size);
return DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return DiskDecorator::readFile(path, settings, estimated_size);
}
std::unique_ptr<WriteBufferFromFileBase>
@ -180,7 +177,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
[this, path, buf_size, mode]()
{
/// Copy file from cache to actual disk when cached buffer is finalized.
auto src_buffer = cache_disk->readFile(path, buf_size, 0, 0, 0, nullptr);
auto src_buffer = cache_disk->readFile(path, ReadSettings(), 0);
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode);
copyData(*src_buffer, *dst_buffer);
dst_buffer->finalize();

View File

@ -36,11 +36,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;

View File

@ -115,9 +115,9 @@ void DiskDecorator::listFiles(const String & path, std::vector<String> & file_na
std::unique_ptr<ReadBufferFromFileBase>
DiskDecorator::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
const String & path, const ReadSettings & settings, size_t estimated_size) const
{
return delegate->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return delegate->readFile(path, settings, estimated_size);
}
std::unique_ptr<WriteBufferFromFileBase>

View File

@ -37,11 +37,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -64,7 +61,8 @@ public:
void sync(int fd) const;
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DiskType::Type getType() const override { return delegate->getType(); }
DiskType getType() const override { return delegate->getType(); }
bool isRemote() const override { return delegate->isRemote(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;

View File

@ -238,18 +238,15 @@ void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk>
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
const ReadSettings & settings,
size_t estimated_size) const
{
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
auto settings = current_settings.get();
auto buffer = delegate->readFile(wrapped_path, settings, estimated_size);
auto encryption_settings = current_settings.get();
FileEncryption::Header header = readHeader(*buffer);
String key = getKey(path, header, *settings);
return std::make_unique<ReadBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header);
String key = getKey(path, header, *encryption_settings);
return std::make_unique<ReadBufferFromEncryptedFile>(settings.local_fs_buffer_size, std::move(buffer), key, header);
}
std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String & path, size_t buf_size, WriteMode mode)
@ -265,7 +262,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String &
if (old_file_size)
{
/// Append mode: we continue to use the same header.
auto read_buffer = delegate->readFile(wrapped_path, FileEncryption::Header::kSize);
auto read_buffer = delegate->readFile(wrapped_path, ReadSettings().adjustBufferSize(FileEncryption::Header::kSize));
header = readHeader(*read_buffer);
key = getKey(path, header, *settings);
}

View File

@ -121,11 +121,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -215,7 +212,8 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
DiskType::Type getType() const override { return DiskType::Type::Encrypted; }
DiskType getType() const override { return DiskType::Encrypted; }
bool isRemote() const override { return delegate->isRemote(); }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;

View File

@ -259,11 +259,9 @@ void DiskLocal::replaceFile(const String & from_path, const String & to_path)
fs::rename(from_file, to_file);
}
std::unique_ptr<ReadBufferFromFileBase>
DiskLocal::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path, const ReadSettings & settings, size_t estimated_size) const
{
return createReadBufferFromFileBase(fs::path(disk_path) / path, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache, buf_size);
return createReadBufferFromFileBase(fs::path(disk_path) / path, settings, estimated_size);
}
std::unique_ptr<WriteBufferFromFileBase>

View File

@ -73,11 +73,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -99,7 +96,8 @@ public:
void truncateFile(const String & path, size_t size) override;
DiskType::Type getType() const override { return DiskType::Type::Local; }
DiskType getType() const override { return DiskType::Local; }
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }

View File

@ -313,7 +313,7 @@ void DiskMemory::replaceFileImpl(const String & from_path, const String & to_pat
files.insert(std::move(node));
}
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, size_t /*buf_size*/, size_t, size_t, size_t, MMappedFileCache *) const
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, const ReadSettings &, size_t) const
{
std::lock_guard lock(mutex);

View File

@ -64,11 +64,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -90,7 +87,8 @@ public:
void truncateFile(const String & path, size_t size) override;
DiskType::Type getType() const override { return DiskType::Type::RAM; }
DiskType getType() const override { return DiskType::RAM; }
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }

View File

@ -187,11 +187,10 @@ void DiskRestartProxy::listFiles(const String & path, std::vector<String> & file
}
std::unique_ptr<ReadBufferFromFileBase> DiskRestartProxy::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache)
const
const String & path, const ReadSettings & settings, size_t estimated_size) const
{
ReadLock lock (mutex);
auto impl = DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
auto impl = DiskDecorator::readFile(path, settings, estimated_size);
return std::make_unique<RestartAwareReadBuffer>(*this, std::move(impl));
}

View File

@ -45,11 +45,8 @@ public:
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;

View File

@ -5,37 +5,34 @@
namespace DB
{
struct DiskType
enum class DiskType
{
enum class Type
{
Local,
RAM,
S3,
HDFS,
Encrypted,
WebServer
};
static String toString(Type disk_type)
{
switch (disk_type)
{
case Type::Local:
return "local";
case Type::RAM:
return "memory";
case Type::S3:
return "s3";
case Type::HDFS:
return "hdfs";
case Type::Encrypted:
return "encrypted";
case Type::WebServer:
return "web";
}
__builtin_unreachable();
}
Local,
RAM,
S3,
HDFS,
Encrypted,
WebServer,
};
inline String toString(DiskType disk_type)
{
switch (disk_type)
{
case DiskType::Local:
return "local";
case DiskType::RAM:
return "memory";
case DiskType::S3:
return "s3";
case DiskType::HDFS:
return "hdfs";
case DiskType::Encrypted:
return "encrypted";
case DiskType::WebServer:
return "web";
}
__builtin_unreachable();
}
}

View File

@ -226,7 +226,7 @@ bool DiskWebServer::exists(const String & path) const
}
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, const ReadSettings & read_settings, size_t) const
{
LOG_DEBUG(log, "Read from file by path: {}", path);
@ -237,7 +237,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
RemoteMetadata meta(uri, fs::path(path).parent_path() / fs::path(path).filename());
meta.remote_fs_objects.emplace_back(std::make_pair(getFileName(path), file.size));
auto reader = std::make_unique<ReadBufferFromWebServer>(uri, meta, getContext(), settings->max_read_tries, buf_size);
auto reader = std::make_unique<ReadBufferFromWebServer>(uri, meta, getContext(), settings->max_read_tries, read_settings.remote_fs_buffer_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}

View File

@ -107,14 +107,14 @@ public:
String getFileName(const String & path) const;
DiskType::Type getType() const override { return DiskType::Type::WebServer; }
DiskType getType() const override { return DiskType::WebServer; }
bool isRemote() const override { return true; }
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
/// Disk info
const String & getName() const final override { return name; }

View File

@ -93,15 +93,15 @@ DiskHDFS::DiskHDFS(
}
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, const ReadSettings & read_settings, size_t) const
{
auto metadata = readMeta(path);
LOG_DEBUG(log,
LOG_TRACE(log,
"Read from file by path: {}. Existing HDFS objects: {}",
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromHDFS>(config, remote_fs_root_path, metadata, buf_size);
auto reader = std::make_unique<ReadIndirectBufferFromHDFS>(config, remote_fs_root_path, metadata, read_settings.remote_fs_buffer_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
@ -114,7 +114,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
auto file_name = getRandomName();
auto hdfs_path = remote_fs_root_path + file_name;
LOG_DEBUG(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
LOG_TRACE(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
backQuote(metadata_path + path), hdfs_path);
/// Single O_WRONLY in libhdfs adds O_TRUNC

View File

@ -42,17 +42,15 @@ public:
const String & metadata_path_,
const Poco::Util::AbstractConfiguration & config_);
DiskType::Type getType() const override { return DiskType::Type::HDFS; }
DiskType getType() const override { return DiskType::HDFS; }
bool isRemote() const override { return true; }
bool supportZeroCopyReplication() const override { return true; }
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;

View File

@ -8,24 +8,34 @@
#include <Common/Exception.h>
#include <Disks/Executor.h>
#include <Disks/DiskType.h>
#include <IO/ReadSettings.h>
#include <memory>
#include <mutex>
#include <utility>
#include <boost/noncopyable.hpp>
#include "Poco/Util/AbstractConfiguration.h"
#include <Poco/Timestamp.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
}
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
extern const Metric DiskSpaceReservedForMerge;
}
namespace DB
{
class IDiskDirectoryIterator;
using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
@ -155,11 +165,8 @@ public:
/// Open the file for read and return ReadBufferFromFileBase object.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
size_t estimated_size = 0,
size_t direct_io_threshold = 0,
size_t mmap_threshold = 0,
MMappedFileCache * mmap_cache = nullptr) const = 0;
const ReadSettings & settings = ReadSettings{},
size_t estimated_size = 0) const = 0;
/// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(
@ -210,7 +217,10 @@ public:
virtual void truncateFile(const String & path, size_t size);
/// Return disk type - "local", "s3", etc.
virtual DiskType::Type getType() const = 0;
virtual DiskType getType() const = 0;
/// Involves network interaction.
virtual bool isRemote() const = 0;
/// Whether this disk support zero-copy replication.
/// Overrode in remote fs disks.
@ -240,7 +250,7 @@ public:
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) { }
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) {}
protected:
friend class DiskDecorator;

View File

@ -39,7 +39,6 @@ public:
/// mutations files
virtual DiskPtr getAnyDisk() const = 0;
virtual DiskPtr getDiskByName(const String & disk_name) const = 0;
virtual Disks getDisksByType(DiskType::Type type) const = 0;
/// Get free space from most free disk
virtual UInt64 getMaxUnreservedFreeSpace() const = 0;
/// Reserves space on any volume with index > min_volume_index or returns nullptr

View File

@ -184,7 +184,7 @@ void DiskS3::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
if (s3_paths_keeper)
s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk)
{
LOG_DEBUG(log, "Remove AWS keys {}", S3PathKeeper::getChunkKeys(chunk));
LOG_TRACE(log, "Remove AWS keys {}", S3PathKeeper::getChunkKeys(chunk));
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(chunk);
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
@ -221,15 +221,16 @@ void DiskS3::moveFile(const String & from_path, const String & to_path, bool sen
fs::rename(fs::path(metadata_path) / from_path, fs::path(metadata_path) / to_path);
}
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, const ReadSettings & read_settings, size_t) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}",
LOG_TRACE(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromS3>(settings->client, bucket, metadata, settings->s3_max_single_read_retries, buf_size);
auto reader = std::make_unique<ReadIndirectBufferFromS3>(
settings->client, bucket, metadata, settings->s3_max_single_read_retries, read_settings.remote_fs_buffer_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
@ -251,7 +252,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
s3_path = "r" + revisionToString(revision) + "-file-" + s3_path;
}
LOG_DEBUG(log, "{} to file by path: {}. S3 path: {}",
LOG_TRACE(log, "{} to file by path: {}. S3 path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), remote_fs_root_path + s3_path);
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
@ -351,7 +352,7 @@ void DiskS3::findLastRevision()
{
auto revision_prefix = revision + "1";
LOG_DEBUG(log, "Check object exists with revision prefix {}", revision_prefix);
LOG_TRACE(log, "Check object exists with revision prefix {}", revision_prefix);
/// Check file or operation with such revision prefix exists.
if (checkObjectExists(bucket, remote_fs_root_path + "r" + revision_prefix)
@ -405,7 +406,7 @@ void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & met
void DiskS3::migrateFileToRestorableSchema(const String & path)
{
LOG_DEBUG(log, "Migrate file {} to restorable schema", metadata_path + path);
LOG_TRACE(log, "Migrate file {} to restorable schema", metadata_path + path);
auto meta = readMeta(path);
@ -422,7 +423,7 @@ void DiskS3::migrateToRestorableSchemaRecursive(const String & path, Futures & r
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
LOG_DEBUG(log, "Migrate directory {} to restorable schema", metadata_path + path);
LOG_TRACE(log, "Migrate directory {} to restorable schema", metadata_path + path);
bool dir_contains_only_files = true;
for (auto it = iterateDirectory(path); it->isValid(); it->next())
@ -595,7 +596,7 @@ void DiskS3::copyObjectMultipartImpl(const String & src_bucket, const String & s
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata) const
{
LOG_DEBUG(log, "Multipart copy upload has created. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, Metadata: {}",
LOG_TRACE(log, "Multipart copy upload has created. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, Metadata: {}",
src_bucket, src_key, dst_bucket, dst_key, metadata ? "REPLACE" : "NOT_SET");
auto settings = current_settings.get();
@ -669,7 +670,7 @@ void DiskS3::copyObjectMultipartImpl(const String & src_bucket, const String & s
throwIfError(outcome);
LOG_DEBUG(log, "Multipart copy upload has completed. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, "
LOG_TRACE(log, "Multipart copy upload has completed. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, "
"Upload_id: {}, Parts: {}", src_bucket, src_key, dst_bucket, dst_key, multipart_upload_id, part_tags.size());
}
}
@ -871,7 +872,7 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
metadata.addObject(relative_key, head_result.GetContentLength());
metadata.save();
LOG_DEBUG(log, "Restored file {}", path);
LOG_TRACE(log, "Restored file {}", path);
}
}
@ -918,7 +919,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
if (exists(from_path))
{
moveFile(from_path, to_path, send_metadata);
LOG_DEBUG(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
LOG_TRACE(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
if (restore_information.detached && isDirectory(to_path))
{
@ -945,7 +946,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
{
createDirectories(directoryPath(dst_path));
createHardLink(src_path, dst_path, send_metadata);
LOG_DEBUG(log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
LOG_TRACE(log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
}
}
}
@ -976,7 +977,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
auto detached_path = pathToDetached(path);
LOG_DEBUG(log, "Move directory to 'detached' {} -> {}", path, detached_path);
LOG_TRACE(log, "Move directory to 'detached' {} -> {}", path, detached_path);
fs::path from_path = fs::path(metadata_path) / path;
fs::path to_path = fs::path(metadata_path) / detached_path;

View File

@ -76,11 +76,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -97,7 +94,8 @@ public:
void createHardLink(const String & src_path, const String & dst_path) override;
void createHardLink(const String & src_path, const String & dst_path, bool send_metadata);
DiskType::Type getType() const override { return DiskType::Type::S3; }
DiskType getType() const override { return DiskType::S3; }
bool isRemote() const override { return true; }
bool supportZeroCopyReplication() const override { return true; }

View File

@ -39,7 +39,7 @@ void checkWriteAccess(IDisk & disk)
void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
auto file = disk.readFile("test_acl");
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")

View File

@ -157,17 +157,6 @@ Disks StoragePolicy::getDisks() const
}
Disks StoragePolicy::getDisksByType(DiskType::Type type) const
{
Disks res;
for (const auto & volume : volumes)
for (const auto & disk : volume->getDisks())
if (disk->getType() == type)
res.push_back(disk);
return res;
}
DiskPtr StoragePolicy::getAnyDisk() const
{
/// StoragePolicy must contain at least one Volume

View File

@ -47,9 +47,6 @@ public:
/// Returns disks ordered by volumes priority
Disks getDisks() const override;
/// Returns disks by type ordered by volumes priority
Disks getDisksByType(DiskType::Type type) const override;
/// Returns any disk
/// Used when it's not important, for example for
/// mutations files

View File

@ -53,7 +53,7 @@ TEST(DiskTestHDFS, WriteReadHDFS)
{
DB::String result;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
auto in = disk.readFile(file_name, {}, 1024);
readString(result, *in);
EXPECT_EQ("Test write to file", result);
}
@ -76,7 +76,7 @@ TEST(DiskTestHDFS, RewriteFileHDFS)
{
String result;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
auto in = disk.readFile(file_name, {}, 1024);
readString(result, *in);
EXPECT_EQ("Text10", result);
readString(result, *in);
@ -104,7 +104,7 @@ TEST(DiskTestHDFS, AppendFileHDFS)
{
String result, expected;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
auto in = disk.readFile(file_name, {}, 1024);
readString(result, *in);
EXPECT_EQ("Text0123456789", result);
@ -131,7 +131,7 @@ TEST(DiskTestHDFS, SeekHDFS)
/// Test SEEK_SET
{
String buf(4, '0');
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, {}, 1024);
in->seek(5, SEEK_SET);
@ -141,7 +141,7 @@ TEST(DiskTestHDFS, SeekHDFS)
/// Test SEEK_CUR
{
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, {}, 1024);
String buf(4, '0');
in->readStrict(buf.data(), 4);

View File

@ -22,6 +22,7 @@ SRCS(
IVolume.cpp
LocalDirectorySyncGuard.cpp
ReadIndirectBufferFromRemoteFS.cpp
ReadIndirectBufferFromWebServer.cpp
SingleDiskVolume.cpp
StoragePolicy.cpp
TemporaryFileOnDisk.cpp

View File

@ -6,6 +6,7 @@
# include <sys/syscall.h>
# include <unistd.h>
# include <utility>
/** Small wrappers for asynchronous I/O.
@ -50,7 +51,19 @@ AIOContext::AIOContext(unsigned int nr_events)
AIOContext::~AIOContext()
{
io_destroy(ctx);
if (ctx)
io_destroy(ctx);
}
AIOContext::AIOContext(AIOContext && rhs)
{
*this = std::move(rhs);
}
AIOContext & AIOContext::operator=(AIOContext && rhs)
{
std::swap(ctx, rhs.ctx);
return *this;
}
#elif defined(OS_FREEBSD)

View File

@ -33,10 +33,13 @@ int io_getevents(aio_context_t ctx, long min_nr, long max_nr, io_event * events,
struct AIOContext : private boost::noncopyable
{
aio_context_t ctx;
aio_context_t ctx = 0;
AIOContext(unsigned int nr_events = 128);
AIOContext() {}
AIOContext(unsigned int nr_events);
~AIOContext();
AIOContext(AIOContext && rhs);
AIOContext & operator=(AIOContext && rhs);
};
#elif defined(OS_FREEBSD)

View File

@ -1,172 +0,0 @@
#if defined(OS_LINUX) || defined(__FreeBSD__)
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <Common/MemorySanitizer.h>
#include <Poco/Logger.h>
#include <boost/range/iterator_range.hpp>
#include <errno.h>
#include <IO/AIOContextPool.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
}
AIOContextPool::~AIOContextPool()
{
cancelled.store(true, std::memory_order_relaxed);
io_completion_monitor.join();
}
void AIOContextPool::doMonitor()
{
/// continue checking for events unless cancelled
while (!cancelled.load(std::memory_order_relaxed))
waitForCompletion();
/// wait until all requests have been completed
while (!promises.empty())
waitForCompletion();
}
void AIOContextPool::waitForCompletion()
{
/// array to hold completion events
std::vector<io_event> events(max_concurrent_events);
try
{
const auto num_events = getCompletionEvents(events.data(), max_concurrent_events);
fulfillPromises(events.data(), num_events);
notifyProducers(num_events);
}
catch (...)
{
/// there was an error, log it, return to any producer and continue
reportExceptionToAnyProducer();
tryLogCurrentException("AIOContextPool::waitForCompletion()");
}
}
int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) const
{
timespec timeout{timeout_sec, 0};
auto num_events = 0;
/// request 1 to `max_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno);
/// Unpoison the memory returned from a non-instrumented system call.
__msan_unpoison(events, sizeof(*events) * num_events);
return num_events;
}
void AIOContextPool::fulfillPromises(const io_event events[], const int num_events)
{
if (num_events == 0)
return;
const std::lock_guard lock{mutex};
/// look at returned events and find corresponding promise, set result and erase promise from map
for (const auto & event : boost::make_iterator_range(events, events + num_events))
{
/// get id from event
#if defined(__FreeBSD__)
const auto completed_id = (reinterpret_cast<struct iocb *>(event.udata))->aio_data;
#else
const auto completed_id = event.data;
#endif
/// set value via promise and release it
const auto it = promises.find(completed_id);
if (it == std::end(promises))
{
LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id {}", completed_id);
continue;
}
#if defined(__FreeBSD__)
it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata)));
#else
it->second.set_value(event.res);
#endif
promises.erase(it);
}
}
void AIOContextPool::notifyProducers(const int num_producers) const
{
if (num_producers == 0)
return;
if (num_producers > 1)
have_resources.notify_all();
else
have_resources.notify_one();
}
void AIOContextPool::reportExceptionToAnyProducer()
{
const std::lock_guard lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
}
std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb)
{
std::unique_lock lock{mutex};
/// get current id and increment it by one
const auto request_id = next_id;
++next_id;
/// create a promise and put request in "queue"
promises.emplace(request_id, std::promise<BytesRead>{});
/// store id in AIO request for further identification
iocb.aio_data = request_id;
struct iocb * requests[] { &iocb };
/// submit a request
while (io_submit(aio_context.ctx, 1, requests) < 0)
{
if (errno == EAGAIN)
/// wait until at least one event has been completed (or a spurious wakeup) and try again
have_resources.wait(lock);
else if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
}
return promises[request_id].get_future();
}
AIOContextPool & AIOContextPool::instance()
{
static AIOContextPool instance;
return instance;
}
}
#endif

View File

@ -1,53 +0,0 @@
#pragma once
#if defined(OS_LINUX) || defined(__FreeBSD__)
#include <condition_variable>
#include <future>
#include <mutex>
#include <map>
#include <IO/AIO.h>
#include <Common/ThreadPool.h>
namespace DB
{
class AIOContextPool : private boost::noncopyable
{
static const auto max_concurrent_events = 128;
static const auto timeout_sec = 1;
AIOContext aio_context{max_concurrent_events};
using ID = size_t;
using BytesRead = ssize_t;
/// Autoincremental id used to identify completed requests
ID next_id{};
mutable std::mutex mutex;
mutable std::condition_variable have_resources;
std::map<ID, std::promise<BytesRead>> promises;
std::atomic<bool> cancelled{false};
ThreadFromGlobalPool io_completion_monitor{&AIOContextPool::doMonitor, this};
~AIOContextPool();
void doMonitor();
void waitForCompletion();
int getCompletionEvents(io_event events[], const int max_events) const;
void fulfillPromises(const io_event events[], const int num_events);
void notifyProducers(const int num_producers) const;
void reportExceptionToAnyProducer();
public:
static AIOContextPool & instance();
/// Request AIO read operation for iocb, returns a future with number of bytes read
std::future<BytesRead> post(struct iocb & iocb);
};
}
#endif

View File

@ -0,0 +1,106 @@
#include <fcntl.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/ProfileEvents.h>
#include <errno.h>
namespace ProfileEvents
{
extern const Event FileOpen;
}
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_CLOSE_FILE;
}
AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_,
Int32 priority_,
const std::string & file_name_,
size_t buf_size,
int flags,
char * existing_memory,
size_t alignment)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, -1, buf_size, existing_memory, alignment), file_name(file_name_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
#ifdef __APPLE__
bool o_direct = (flags != -1) && (flags & O_DIRECT);
if (o_direct)
flags = flags & ~O_DIRECT;
#endif
fd = ::open(file_name.c_str(), flags == -1 ? O_RDONLY | O_CLOEXEC : flags | O_CLOEXEC);
if (-1 == fd)
throwFromErrnoWithPath("Cannot open file " + file_name, file_name,
errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
#ifdef __APPLE__
if (o_direct)
{
if (fcntl(fd, F_NOCACHE, 1) == -1)
throwFromErrnoWithPath("Cannot set F_NOCACHE on file " + file_name, file_name, ErrorCodes::CANNOT_OPEN_FILE);
}
#endif
}
AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_,
Int32 priority_,
int & fd_,
const std::string & original_file_name,
size_t buf_size,
char * existing_memory,
size_t alignment)
:
AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, fd_, buf_size, existing_memory, alignment),
file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
{
fd_ = -1;
}
AsynchronousReadBufferFromFile::~AsynchronousReadBufferFromFile()
{
/// Must wait for events in flight before closing the file.
finalize();
if (fd < 0)
return;
::close(fd);
}
void AsynchronousReadBufferFromFile::close()
{
if (fd < 0)
return;
if (0 != ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
}
AsynchronousReadBufferFromFileWithDescriptorsCache::~AsynchronousReadBufferFromFileWithDescriptorsCache()
{
/// Must wait for events in flight before potentially closing the file by destroying OpenedFilePtr.
finalize();
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <IO/AsynchronousReadBufferFromFileDescriptor.h>
#include <IO/OpenedFileCache.h>
namespace DB
{
class AsynchronousReadBufferFromFile : public AsynchronousReadBufferFromFileDescriptor
{
protected:
std::string file_name;
public:
explicit AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_, Int32 priority_,
const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0);
/// Use pre-opened file descriptor.
explicit AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_, Int32 priority_,
int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object.
const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, size_t alignment = 0);
~AsynchronousReadBufferFromFile() override;
/// Close file before destruction of object.
void close();
std::string getFileName() const override
{
return file_name;
}
};
/** Similar to AsynchronousReadBufferFromFile but also transparently shares open file descriptors.
*/
class AsynchronousReadBufferFromFileWithDescriptorsCache : public AsynchronousReadBufferFromFileDescriptor
{
private:
std::string file_name;
OpenedFileCache::OpenedFilePtr file;
public:
AsynchronousReadBufferFromFileWithDescriptorsCache(
AsynchronousReaderPtr reader_, Int32 priority_,
const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, -1, buf_size, existing_memory, alignment),
file_name(file_name_)
{
file = OpenedFileCache::instance().get(file_name, flags);
fd = file->getFD();
}
~AsynchronousReadBufferFromFileWithDescriptorsCache() override;
std::string getFileName() const override
{
return file_name;
}
};
}

View File

@ -0,0 +1,204 @@
#include <errno.h>
#include <time.h>
#include <optional>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <IO/AsynchronousReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
namespace ProfileEvents
{
extern const Event AsynchronousReadWaitMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric AsynchronousReadWait;
}
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const
{
return "(fd = " + toString(fd) + ")";
}
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::readInto(char * data, size_t size)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = priority;
return reader->submit(request);
}
void AsynchronousReadBufferFromFileDescriptor::prefetch()
{
if (prefetch_future.valid())
return;
/// Will request the same amount of data that is read in nextImpl.
prefetch_buffer.resize(internal_buffer.size());
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
}
bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
{
if (prefetch_future.valid())
{
/// Read request already in flight. Wait for its completion.
size_t size = 0;
{
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
size = prefetch_future.get();
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
}
prefetch_future = {};
file_offset_of_buffer_end += size;
if (size)
{
prefetch_buffer.swap(memory);
set(memory.data(), memory.size());
working_buffer.resize(size);
return true;
}
return false;
}
else
{
/// No pending request. Do synchronous read.
auto size = readInto(memory.data(), memory.size()).get();
file_offset_of_buffer_end += size;
if (size)
{
set(memory.data(), memory.size());
working_buffer.resize(size);
return true;
}
return false;
}
}
void AsynchronousReadBufferFromFileDescriptor::finalize()
{
if (prefetch_future.valid())
{
prefetch_future.wait();
prefetch_future = {};
}
}
AsynchronousReadBufferFromFileDescriptor::~AsynchronousReadBufferFromFileDescriptor()
{
finalize();
}
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
{
size_t new_pos;
if (whence == SEEK_SET)
{
assert(offset >= 0);
new_pos = offset;
}
else if (whence == SEEK_CUR)
{
new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset;
}
else
{
throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos;
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return new_pos;
}
else
{
if (prefetch_future.valid())
{
//std::cerr << "Ignoring prefetched data" << "\n";
prefetch_future.wait();
prefetch_future = {};
}
/// Position is out of the buffer, we need to do real seek.
off_t seek_pos = required_alignment > 1
? new_pos / required_alignment * required_alignment
: new_pos;
off_t offset_after_seek_pos = new_pos - seek_pos;
/// First put position at the end of the buffer so the next read will fetch new data to the buffer.
pos = working_buffer.end();
/// Just update the info about the next position in file.
file_offset_of_buffer_end = seek_pos;
if (offset_after_seek_pos > 0)
ignore(offset_after_seek_pos);
return seek_pos;
}
}
void AsynchronousReadBufferFromFileDescriptor::rewind()
{
if (prefetch_future.valid())
{
prefetch_future.wait();
prefetch_future = {};
}
/// Clearing the buffer with existing data. New data will be read on subsequent call to 'next'.
working_buffer.resize(0);
pos = working_buffer.begin();
file_offset_of_buffer_end = 0;
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <IO/ReadBufferFromFileBase.h>
#include <IO/AsynchronousReader.h>
#include <Interpreters/Context.h>
#include <optional>
#include <unistd.h>
namespace DB
{
/** Use ready file descriptor. Does not open or close a file.
*/
class AsynchronousReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
protected:
AsynchronousReaderPtr reader;
Int32 priority;
Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future;
const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned.
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().
int fd;
bool nextImpl() override;
/// Name or some description of file.
std::string getFileName() const override;
void finalize();
public:
AsynchronousReadBufferFromFileDescriptor(
AsynchronousReaderPtr reader_, Int32 priority_,
int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment),
reader(std::move(reader_)), priority(priority_), required_alignment(alignment), fd(fd_)
{
}
~AsynchronousReadBufferFromFileDescriptor() override;
void prefetch() override;
int getFD() const
{
return fd;
}
off_t getPosition() override
{
return file_offset_of_buffer_end - (working_buffer.end() - pos);
}
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t seek(off_t off, int whence) override;
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
void rewind();
private:
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
};
}

View File

@ -0,0 +1,69 @@
#pragma once
#include <Core/Types.h>
#include <optional>
#include <memory>
#include <future>
namespace DB
{
/** Interface for asynchronous reads from file descriptors.
* It can abstract Linux AIO, io_uring or normal reads from separate thread pool,
* and also reads from non-local filesystems.
* The implementation not necessarily to be efficient for large number of small requests,
* instead it should be ok for moderate number of sufficiently large requests
* (e.g. read 1 MB of data 50 000 times per seconds; BTW this is normal performance for reading from page cache).
* For example, this interface may not suffice if you want to serve 10 000 000 of 4 KiB requests per second.
* This interface is fairly limited.
*/
class IAsynchronousReader
{
public:
/// For local filesystems, the file descriptor is simply integer
/// but it can be arbitrary opaque object for remote filesystems.
struct IFileDescriptor
{
virtual ~IFileDescriptor() = default;
};
using FileDescriptorPtr = std::shared_ptr<IFileDescriptor>;
struct LocalFileDescriptor : public IFileDescriptor
{
LocalFileDescriptor(int fd_) : fd(fd_) {}
int fd;
};
/// Read from file descriptor at specified offset up to size bytes into buf.
/// Some implementations may require alignment and it is responsibility of
/// the caller to provide conforming requests.
struct Request
{
FileDescriptorPtr descriptor;
size_t offset = 0;
size_t size = 0;
char * buf = nullptr;
int64_t priority = 0;
};
/// Less than requested amount of data can be returned.
/// If size is zero - the file has ended.
/// (for example, EINTR must be handled by implementation automatically)
using Result = size_t;
/// Submit request and obtain a handle. This method don't perform any waits.
/// If this method did not throw, the caller must wait for the result with 'wait' method
/// or destroy the whole reader before destroying the buffer for request.
/// The method can be called concurrently from multiple threads.
virtual std::future<Result> submit(Request request) = 0;
/// Destructor must wait for all not completed request and ignore the results.
/// It may also cancel the requests.
virtual ~IAsynchronousReader() = default;
};
using AsynchronousReaderPtr = std::shared_ptr<IAsynchronousReader>;
}

View File

@ -48,18 +48,22 @@ struct Memory : boost::noncopyable, Allocator
dealloc();
}
Memory(Memory && rhs) noexcept
{
*this = std::move(rhs);
}
Memory & operator=(Memory && rhs) noexcept
void swap(Memory & rhs) noexcept
{
std::swap(m_capacity, rhs.m_capacity);
std::swap(m_size, rhs.m_size);
std::swap(m_data, rhs.m_data);
std::swap(alignment, rhs.alignment);
}
Memory(Memory && rhs) noexcept
{
swap(rhs);
}
Memory & operator=(Memory && rhs) noexcept
{
swap(rhs);
return *this;
}

View File

@ -65,6 +65,12 @@ public:
it->second = res;
return res;
}
static OpenedFileCache & instance()
{
static OpenedFileCache res;
return res;
}
};
using OpenedFileCachePtr = std::shared_ptr<OpenedFileCache>;

View File

@ -197,6 +197,11 @@ public:
return read(to, n);
}
/** Do something to allow faster subsequent call to 'nextImpl' if possible.
* It's used for asynchronous readers with double-buffering.
*/
virtual void prefetch() {}
protected:
/// The number of bytes to ignore from the initial position of `working_buffer`
/// buffer. Apparently this is an additional out-parameter for nextImpl(),

View File

@ -88,7 +88,4 @@ void ReadBufferFromFile::close()
metric_increment.destroy();
}
OpenedFileCache ReadBufferFromFilePReadWithCache::cache;
}

View File

@ -4,10 +4,6 @@
#include <IO/OpenedFileCache.h>
#include <Common/CurrentMetrics.h>
#ifndef O_DIRECT
#define O_DIRECT 00040000
#endif
namespace CurrentMetrics
{
@ -65,21 +61,19 @@ public:
/** Similar to ReadBufferFromFilePRead but also transparently shares open file descriptors.
*/
class ReadBufferFromFilePReadWithCache : public ReadBufferFromFileDescriptorPRead
class ReadBufferFromFilePReadWithDescriptorsCache : public ReadBufferFromFileDescriptorPRead
{
private:
static OpenedFileCache cache;
std::string file_name;
OpenedFileCache::OpenedFilePtr file;
public:
ReadBufferFromFilePReadWithCache(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
ReadBufferFromFilePReadWithDescriptorsCache(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment),
file_name(file_name_)
{
file = cache.get(file_name, flags);
file = OpenedFileCache::instance().get(file_name, flags);
fd = file->getFD();
}

View File

@ -7,8 +7,14 @@
#include <functional>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#ifndef O_DIRECT
#define O_DIRECT 00040000
#endif
namespace DB
{

View File

@ -6,13 +6,9 @@
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <sys/stat.h>
#include <Common/UnicodeBar.h>
#include <Common/TerminalSize.h>
#include <IO/Operators.h>
#include <IO/Progress.h>
#include <sys/stat.h>
namespace ProfileEvents
@ -39,6 +35,7 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_SELECT;
extern const int CANNOT_FSTAT;
extern const int CANNOT_ADVISE;
}
@ -111,6 +108,20 @@ bool ReadBufferFromFileDescriptor::nextImpl()
}
void ReadBufferFromFileDescriptor::prefetch()
{
#if defined(POSIX_FADV_WILLNEED)
/// For direct IO, loading data into page cache is pointless.
if (required_alignment)
return;
/// Ask OS to prefetch data into page cache.
if (0 != posix_fadvise(fd, file_offset_of_buffer_end, internal_buffer.size(), POSIX_FADV_WILLNEED))
throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
#endif
}
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
{
@ -133,16 +144,15 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos;
/// file_offset_of_buffer_end corresponds to working_buffer.end(); it's a past-the-end pos,
/// so the second inequality is strict.
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos < file_offset_of_buffer_end)
&& new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos < working_buffer.end());
assert(pos <= working_buffer.end());
return new_pos;
}

View File

@ -21,6 +21,7 @@ protected:
int fd;
bool nextImpl() override;
void prefetch() override;
/// Name or some description of file.
std::string getFileName() const override;

View File

@ -76,7 +76,7 @@ bool ReadBufferFromS3::nextImpl()
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
LOG_DEBUG(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
bucket, key, getPosition(), attempt, e.message());
if (attempt + 1 == max_single_read_retries)

32
src/IO/ReadSettings.cpp Normal file
View File

@ -0,0 +1,32 @@
#include <IO/ReadSettings.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_READ_METHOD;
}
const char * toString(ReadMethod read_method)
{
switch (read_method)
{
#define CASE_READ_METHOD(NAME) case ReadMethod::NAME: return #NAME;
FOR_EACH_READ_METHOD(CASE_READ_METHOD)
#undef CASE_READ_METHOD
}
__builtin_unreachable();
}
ReadMethod parseReadMethod(const std::string & name)
{
#define CASE_READ_METHOD(NAME) if (name == #NAME) return ReadMethod::NAME;
FOR_EACH_READ_METHOD(CASE_READ_METHOD)
#undef CASE_READ_METHOD
throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}'", name);
}
}

80
src/IO/ReadSettings.h Normal file
View File

@ -0,0 +1,80 @@
#pragma once
#include <cstddef>
#include <string>
#include <Core/Defines.h>
namespace DB
{
#define FOR_EACH_READ_METHOD(M) \
/** Simple synchronous reads with 'read'. \
Can use direct IO after specified size. Can use prefetch by asking OS to perform readahead. */ \
M(read) \
\
/** Simple synchronous reads with 'pread'. \
In contrast to 'read', shares single file descriptor from multiple threads. \
Can use direct IO after specified size. Can use prefetch by asking OS to perform readahead. */ \
M(pread) \
\
/** Use mmap after specified size or simple synchronous reads with 'pread'. \
Can use prefetch by asking OS to perform readahead. */ \
M(mmap) \
\
/** Checks if data is in page cache with 'preadv2' on modern Linux kernels. \
If data is in page cache, read from the same thread. \
If not, offload IO to separate threadpool. \
Can do prefetch with double buffering. \
Can use specified priorities and limit the number of concurrent reads. */ \
M(pread_threadpool) \
\
/** It's using asynchronous reader with fake backend that in fact synchronous. \
Only used for testing purposes. */ \
M(pread_fake_async) \
enum class ReadMethod
{
#define DEFINE_READ_METHOD(NAME) NAME,
FOR_EACH_READ_METHOD(DEFINE_READ_METHOD)
#undef DEFINE_READ_METHOD
};
const char * toString(ReadMethod read_method);
ReadMethod parseReadMethod(const std::string & name);
class MMappedFileCache;
struct ReadSettings
{
/// Method to use reading from local filesystem.
ReadMethod local_fs_method = ReadMethod::pread;
size_t local_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
size_t remote_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
bool local_fs_prefetch = false;
bool remote_fs_prefetch = false;
/// For 'read', 'pread' and 'pread_threadpool' methods.
size_t direct_io_threshold = 0;
/// For 'mmap' method.
size_t mmap_threshold = 0;
MMappedFileCache * mmap_cache = nullptr;
/// For 'pread_threadpool' method. Lower is more priority.
size_t priority = 0;
ReadSettings adjustBufferSize(size_t file_size) const
{
ReadSettings res = *this;
res.local_fs_buffer_size = std::min(file_size, local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(file_size, remote_fs_buffer_size);
return res;
}
};
}

View File

@ -7,7 +7,9 @@ namespace DB
SeekAvoidingReadBuffer::SeekAvoidingReadBuffer(std::unique_ptr<ReadBufferFromFileBase> impl_, UInt64 min_bytes_for_seek_)
: ReadBufferFromFileDecorator(std::move(impl_))
, min_bytes_for_seek(min_bytes_for_seek_)
{ }
{
}
off_t SeekAvoidingReadBuffer::seek(off_t off, int whence)
{

View File

@ -0,0 +1,89 @@
#include <IO/SynchronousReader.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <common/errnoToString.h>
#include <unordered_map>
#include <mutex>
#include <unistd.h>
#include <fcntl.h>
namespace ProfileEvents
{
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadFailed;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event DiskReadElapsedMicroseconds;
extern const Event Seek;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int CANNOT_ADVISE;
}
std::future<IAsynchronousReader::Result> SynchronousReader::submit(Request request)
{
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
#if defined(POSIX_FADV_WILLNEED)
if (0 != posix_fadvise(fd, request.offset, request.size, POSIX_FADV_WILLNEED))
throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
#endif
return std::async(std::launch::deferred, [fd, request]
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
Stopwatch watch(CLOCK_MONOTONIC);
size_t bytes_read = 0;
while (!bytes_read)
{
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
res = ::pread(fd, request.buf, request.size, request.offset);
}
if (!res)
break;
if (-1 == res && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (res > 0)
bytes_read += res;
}
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
/// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
/// (TaskStatsInfoGetter has about 500K RPS).
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return bytes_read;
});
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <IO/AsynchronousReader.h>
namespace DB
{
/** Implementation of IAsynchronousReader that in fact synchronous.
* The only addition is posix_fadvise.
*/
class SynchronousReader final : public IAsynchronousReader
{
public:
std::future<Result> submit(Request request) override;
};
}

Some files were not shown because too many files have changed in this diff Show More