resolving conflicts

This commit is contained in:
Yakov Olkhovskiy 2022-06-13 11:41:32 -04:00
commit ce395dc68d
36 changed files with 929 additions and 442 deletions

View File

@ -77,6 +77,7 @@ if (OS_LINUX AND NOT LINKER_NAME)
if (NOT LINKER_NAME)
if (GOLD_PATH)
message (WARNING "Linking with gold is not recommended. Please use lld.")
if (COMPILER_GCC)
set (LINKER_NAME "gold")
else ()

View File

@ -76,9 +76,7 @@ message (STATUS "LLVM library Directory: ${LLVM_LIBRARY_DIRS}")
message (STATUS "LLVM C++ compiler flags: ${LLVM_CXXFLAGS}")
# ld: unknown option: --color-diagnostics
if (APPLE)
set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "")
endif ()
set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "")
# Do not adjust RPATH in llvm, since then it will not be able to find libcxx/libcxxabi/libunwind
set (CMAKE_INSTALL_RPATH "ON")

View File

@ -21,7 +21,9 @@ By default, starting above server instance will be run as default user without p
### connect to it from a native client
```bash
$ docker run -it --rm --link some-clickhouse-server:clickhouse-server clickhouse/clickhouse-client --host clickhouse-server
$ docker run -it --rm --link some-clickhouse-server:clickhouse-server --entrypoint clickhouse-client clickhouse/clickhouse-server --host clickhouse-server
# OR
$ docker exec -it some-clickhouse-server clickhouse-client
```
More information about [ClickHouse client](https://clickhouse.com/docs/en/interfaces/cli/).

View File

@ -7,22 +7,12 @@ RUN apt-get update -y \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get install --yes --no-install-recommends \
python3-requests \
llvm-9
&& apt-get clean
COPY s3downloader /s3downloader
ENV S3_URL="https://clickhouse-datasets.s3.amazonaws.com"
ENV DATASETS="hits visits"
ENV EXPORT_S3_STORAGE_POLICIES=1
# Download Minio-related binaries
RUN arch=${TARGETARCH:-amd64} \
&& if [ "$arch" = "amd64" ] ; then wget "https://dl.min.io/server/minio/release/linux-${arch}/archive/minio-20220103182258.0.0.x86_64.rpm"; else wget "https://dl.min.io/server/minio/release/linux-${arch}/archive/minio-20220103182258.0.0.aarch64.rpm" ; fi \
&& wget "https://dl.min.io/client/mc/release/linux-${arch}/mc" \
&& chmod +x ./mc
ENV MINIO_ROOT_USER="clickhouse"
ENV MINIO_ROOT_PASSWORD="clickhouse"
COPY setup_minio.sh /
COPY run.sh /
CMD ["/bin/bash", "/run.sh"]

View File

@ -17,7 +17,7 @@ ln -s /usr/share/clickhouse-test/clickhouse-test /usr/bin/clickhouse-test
# install test configs
/usr/share/clickhouse-test/config/install.sh
./setup_minio.sh
./setup_minio.sh stateful
function start()
{

View File

@ -1,77 +0,0 @@
#!/bin/bash
# TODO: Make this file shared with stateless tests
#
# Usage for local run:
#
# ./docker/test/stateful/setup_minio.sh ./tests/
#
set -e -x -a -u
rpm2cpio ./minio-20220103182258.0.0.*.rpm | cpio -i --make-directories
find / -name minio
cp ./usr/local/bin/minio ./
ls -lha
mkdir -p ./minio_data
if [ ! -f ./minio ]; then
echo 'MinIO binary not found, downloading...'
BINARY_TYPE=$(uname -s | tr '[:upper:]' '[:lower:]')
wget "https://dl.min.io/server/minio/release/${BINARY_TYPE}-amd64/minio" \
&& chmod +x ./minio \
&& wget "https://dl.min.io/client/mc/release/${BINARY_TYPE}-amd64/mc" \
&& chmod +x ./mc
fi
MINIO_ROOT_USER=${MINIO_ROOT_USER:-clickhouse}
MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD:-clickhouse}
./minio --version
./minio server --address ":11111" ./minio_data &
i=0
while ! curl -v --silent http://localhost:11111 2>&1 | grep AccessDenied
do
if [[ $i == 60 ]]; then
echo "Failed to setup minio"
exit 0
fi
echo "Trying to connect to minio"
sleep 1
i=$((i + 1))
done
lsof -i :11111
sleep 5
./mc alias set clickminio http://localhost:11111 clickhouse clickhouse
./mc admin user add clickminio test testtest
./mc admin policy set clickminio readwrite user=test
./mc mb clickminio/test
# Upload data to Minio. By default after unpacking all tests will in
# /usr/share/clickhouse-test/queries
TEST_PATH=${1:-/usr/share/clickhouse-test}
MINIO_DATA_PATH=${TEST_PATH}/queries/1_stateful/data_minio
# Iterating over globs will cause redudant FILE variale to be a path to a file, not a filename
# shellcheck disable=SC2045
for FILE in $(ls "${MINIO_DATA_PATH}"); do
echo "$FILE";
./mc cp "${MINIO_DATA_PATH}"/"$FILE" clickminio/test/"$FILE";
done
mkdir -p ~/.aws
cat <<EOT >> ~/.aws/credentials
[default]
aws_access_key_id=clickhouse
aws_secret_access_key=clickhouse
EOT

View File

@ -0,0 +1 @@
../stateless/setup_minio.sh

View File

@ -5,37 +5,36 @@ FROM clickhouse/test-base:$FROM_TAG
ARG odbc_driver_url="https://github.com/ClickHouse/clickhouse-odbc/releases/download/v1.1.4.20200302/clickhouse-odbc-1.1.4-Linux.tar.gz"
# golang version 1.13 on Ubuntu 20 is enough for tests
RUN apt-get update -y \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get install --yes --no-install-recommends \
awscli \
brotli \
expect \
zstd \
golang \
lsof \
mysql-client=8.0* \
ncdu \
netcat-openbsd \
openjdk-11-jre-headless \
openssl \
postgresql-client \
protobuf-compiler \
python3 \
python3-lxml \
python3-pip \
python3-requests \
python3-termcolor \
python3-pip \
qemu-user-static \
sqlite3 \
sudo \
# golang version 1.13 on Ubuntu 20 is enough for tests
golang \
telnet \
tree \
unixodbc \
wget \
mysql-client=8.0* \
postgresql-client \
sqlite3 \
awscli \
openjdk-11-jre-headless \
rpm2cpio \
cpio
zstd \
&& apt-get clean
RUN pip3 install numpy scipy pandas Jinja2
@ -53,13 +52,17 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENV NUM_TRIES=1
ENV MAX_RUN_TIME=0
# Unrelated to vars in setup_minio.sh, but should be the same there
# to have the same binaries for local running scenario
ARG MINIO_SERVER_VERSION=2022-01-03T18-22-58Z
ARG MINIO_CLIENT_VERSION=2022-01-05T23-52-51Z
ARG TARGETARCH
# Download Minio-related binaries
RUN arch=${TARGETARCH:-amd64} \
&& if [ "$arch" = "amd64" ] ; then wget "https://dl.min.io/server/minio/release/linux-${arch}/archive/minio-20220103182258.0.0.x86_64.rpm"; else wget "https://dl.min.io/server/minio/release/linux-${arch}/archive/minio-20220103182258.0.0.aarch64.rpm" ; fi \
&& wget "https://dl.min.io/client/mc/release/linux-${arch}/mc" \
&& chmod +x ./mc
&& wget "https://dl.min.io/server/minio/release/linux-${arch}/archive/minio.RELEASE.${MINIO_SERVER_VERSION}" -O ./minio \
&& wget "https://dl.min.io/client/mc/release/linux-${arch}/archive/mc.RELEASE.${MINIO_CLIENT_VERSION}" -O ./mc \
&& chmod +x ./mc ./minio
RUN wget 'https://dlcdn.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz' \

View File

@ -18,7 +18,7 @@ ln -s /usr/share/clickhouse-test/clickhouse-test /usr/bin/clickhouse-test
# install test configs
/usr/share/clickhouse-test/config/install.sh
./setup_minio.sh
./setup_minio.sh stateless
./setup_hdfs_minicluster.sh
# For flaky check we also enable thread fuzzer

View File

@ -1,29 +1,41 @@
#!/bin/bash
# Usage for local run:
#
# ./docker/test/stateless/setup_minio.sh ./tests/
#
USAGE='Usage for local run:
./docker/test/stateless/setup_minio.sh { stateful | stateless } ./tests/
'
set -e -x -a -u
rpm2cpio ./minio-20220103182258.0.0.*.rpm | cpio -i --make-directories
find / -name minio
cp ./usr/local/bin/minio ./
TEST_TYPE="$1"
shift
case $TEST_TYPE in
stateless) QUERY_DIR=0_stateless ;;
stateful) QUERY_DIR=1_stateful ;;
*) echo "unknown test type $TEST_TYPE"; echo "${USAGE}"; exit 1 ;;
esac
ls -lha
mkdir -p ./minio_data
if [ ! -f ./minio ]; then
MINIO_SERVER_VERSION=${MINIO_SERVER_VERSION:-2022-01-03T18-22-58Z}
MINIO_CLIENT_VERSION=${MINIO_CLIENT_VERSION:-2022-01-05T23-52-51Z}
case $(uname -m) in
x86_64) BIN_ARCH=amd64 ;;
aarch64) BIN_ARCH=arm64 ;;
*) echo "unknown architecture $(uname -m)"; exit 1 ;;
esac
echo 'MinIO binary not found, downloading...'
BINARY_TYPE=$(uname -s | tr '[:upper:]' '[:lower:]')
wget "https://dl.min.io/server/minio/release/${BINARY_TYPE}-amd64/minio" \
&& chmod +x ./minio \
&& wget "https://dl.min.io/client/mc/release/${BINARY_TYPE}-amd64/mc" \
&& chmod +x ./mc
wget "https://dl.min.io/server/minio/release/${BINARY_TYPE}-${BIN_ARCH}/archive/minio.RELEASE.${MINIO_SERVER_VERSION}" -O ./minio \
&& wget "https://dl.min.io/client/mc/release/${BINARY_TYPE}-${BIN_ARCH}/archive/mc.RELEASE.${MINIO_CLIENT_VERSION}" -O ./mc \
&& chmod +x ./mc ./minio
fi
MINIO_ROOT_USER=${MINIO_ROOT_USER:-clickhouse}
@ -52,14 +64,16 @@ sleep 5
./mc admin user add clickminio test testtest
./mc admin policy set clickminio readwrite user=test
./mc mb clickminio/test
./mc policy set public clickminio/test
if [ "$TEST_TYPE" = "stateless" ]; then
./mc policy set public clickminio/test
fi
# Upload data to Minio. By default after unpacking all tests will in
# /usr/share/clickhouse-test/queries
TEST_PATH=${1:-/usr/share/clickhouse-test}
MINIO_DATA_PATH=${TEST_PATH}/queries/0_stateless/data_minio
MINIO_DATA_PATH=${TEST_PATH}/queries/${QUERY_DIR}/data_minio
# Iterating over globs will cause redudant FILE variale to be a path to a file, not a filename
# shellcheck disable=SC2045
@ -71,6 +85,6 @@ done
mkdir -p ~/.aws
cat <<EOT >> ~/.aws/credentials
[default]
aws_access_key_id=clickhouse
aws_secret_access_key=clickhouse
aws_access_key_id=${MINIO_ROOT_USER}
aws_secret_access_key=${MINIO_ROOT_PASSWORD}
EOT

View File

@ -174,7 +174,7 @@ install_packages package_folder
configure
./setup_minio.sh
./setup_minio.sh stateful # to have a proper environment
start

View File

@ -19,7 +19,7 @@ The following tutorial is based on the Ubuntu Linux system. With appropriate cha
### Install Git, CMake, Python and Ninja {#install-git-cmake-python-and-ninja}
``` bash
sudo apt-get install git cmake python ninja-build
sudo apt-get install git cmake ccache python3 ninja-build
```
Or cmake3 instead of cmake on older systems.

View File

@ -22,8 +22,8 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
extern const int DUPLICATE_COLUMN;
extern const int NUMBER_OF_DIMENSIONS_MISMATHED;
extern const int NOT_IMPLEMENTED;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int ARGUMENT_OUT_OF_BOUND;
}
namespace
@ -179,7 +179,7 @@ ColumnObject::Subcolumn::Subcolumn(
{
}
size_t ColumnObject::Subcolumn::Subcolumn::size() const
size_t ColumnObject::Subcolumn::size() const
{
size_t res = num_of_defaults_in_prefix;
for (const auto & part : data)
@ -187,7 +187,7 @@ size_t ColumnObject::Subcolumn::Subcolumn::size() const
return res;
}
size_t ColumnObject::Subcolumn::Subcolumn::byteSize() const
size_t ColumnObject::Subcolumn::byteSize() const
{
size_t res = 0;
for (const auto & part : data)
@ -195,7 +195,7 @@ size_t ColumnObject::Subcolumn::Subcolumn::byteSize() const
return res;
}
size_t ColumnObject::Subcolumn::Subcolumn::allocatedBytes() const
size_t ColumnObject::Subcolumn::allocatedBytes() const
{
size_t res = 0;
for (const auto & part : data)
@ -203,6 +203,37 @@ size_t ColumnObject::Subcolumn::Subcolumn::allocatedBytes() const
return res;
}
void ColumnObject::Subcolumn::get(size_t n, Field & res) const
{
if (isFinalized())
{
getFinalizedColumn().get(n, res);
return;
}
size_t ind = n;
if (ind < num_of_defaults_in_prefix)
{
res = least_common_type.get()->getDefault();
return;
}
ind -= num_of_defaults_in_prefix;
for (const auto & part : data)
{
if (ind < part->size())
{
part->get(ind, res);
res = convertFieldToTypeOrThrow(res, *least_common_type.get());
return;
}
ind -= part->size();
}
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Index ({}) for getting field is out of range", n);
}
void ColumnObject::Subcolumn::checkTypes() const
{
DataTypes prefix_types;
@ -221,7 +252,7 @@ void ColumnObject::Subcolumn::checkTypes() const
void ColumnObject::Subcolumn::insert(Field field)
{
auto info = getFieldInfo(field);
auto info = DB::getFieldInfo(field);
insert(std::move(field), std::move(info));
}
@ -244,8 +275,8 @@ static bool isConversionRequiredBetweenIntegers(const IDataType & lhs, const IDa
bool is_native_int = which_lhs.isNativeInt() && which_rhs.isNativeInt();
bool is_native_uint = which_lhs.isNativeUInt() && which_rhs.isNativeUInt();
return (is_native_int || is_native_uint)
&& lhs.getSizeOfValueInMemory() <= rhs.getSizeOfValueInMemory();
return (!is_native_int && !is_native_uint)
|| lhs.getSizeOfValueInMemory() > rhs.getSizeOfValueInMemory();
}
void ColumnObject::Subcolumn::insert(Field field, FieldInfo info)
@ -288,7 +319,7 @@ void ColumnObject::Subcolumn::insert(Field field, FieldInfo info)
}
else if (!least_common_base_type->equals(*base_type) && !isNothing(base_type))
{
if (!isConversionRequiredBetweenIntegers(*base_type, *least_common_base_type))
if (isConversionRequiredBetweenIntegers(*base_type, *least_common_base_type))
{
base_type = getLeastSupertype(DataTypes{std::move(base_type), least_common_base_type}, true);
type_changed = true;
@ -305,35 +336,96 @@ void ColumnObject::Subcolumn::insert(Field field, FieldInfo info)
void ColumnObject::Subcolumn::insertRangeFrom(const Subcolumn & src, size_t start, size_t length)
{
assert(src.isFinalized());
const auto & src_column = src.data.back();
const auto & src_type = src.least_common_type.get();
assert(start + length <= src.size());
size_t end = start + length;
if (data.empty())
{
addNewColumnPart(src.least_common_type.get());
data.back()->insertRangeFrom(*src_column, start, length);
addNewColumnPart(src.getLeastCommonType());
}
else if (least_common_type.get()->equals(*src_type))
else if (!least_common_type.get()->equals(*src.getLeastCommonType()))
{
data.back()->insertRangeFrom(*src_column, start, length);
}
else
{
auto new_least_common_type = getLeastSupertype(DataTypes{least_common_type.get(), src_type}, true);
auto casted_column = castColumn({src_column, src_type, ""}, new_least_common_type);
if (!least_common_type.get()->equals(*new_least_common_type))
auto new_least_common_type = getLeastSupertype(DataTypes{least_common_type.get(), src.getLeastCommonType()}, true);
if (!new_least_common_type->equals(*least_common_type.get()))
addNewColumnPart(std::move(new_least_common_type));
}
data.back()->insertRangeFrom(*casted_column, start, length);
if (end <= src.num_of_defaults_in_prefix)
{
data.back()->insertManyDefaults(length);
return;
}
if (start < src.num_of_defaults_in_prefix)
data.back()->insertManyDefaults(src.num_of_defaults_in_prefix - start);
auto insert_from_part = [&](const auto & column, size_t from, size_t n)
{
assert(from + n <= column->size());
auto column_type = getDataTypeByColumn(*column);
if (column_type->equals(*least_common_type.get()))
{
data.back()->insertRangeFrom(*column, from, n);
return;
}
/// If we need to insert large range, there is no sense to cut part of column and cast it.
/// Casting of all column and inserting from it can be faster.
/// Threshold is just a guess.
if (n * 3 >= column->size())
{
auto casted_column = castColumn({column, column_type, ""}, least_common_type.get());
data.back()->insertRangeFrom(*casted_column, from, n);
return;
}
auto casted_column = column->cut(from, n);
casted_column = castColumn({casted_column, column_type, ""}, least_common_type.get());
data.back()->insertRangeFrom(*casted_column, 0, n);
};
size_t pos = 0;
size_t processed_rows = src.num_of_defaults_in_prefix;
/// Find the first part of the column that intersects the range.
while (pos < src.data.size() && processed_rows + src.data[pos]->size() < start)
{
processed_rows += src.data[pos]->size();
++pos;
}
/// Insert from the first part of column.
if (pos < src.data.size() && processed_rows < start)
{
size_t part_start = start - processed_rows;
size_t part_length = std::min(src.data[pos]->size() - part_start, end - start);
insert_from_part(src.data[pos], part_start, part_length);
processed_rows += src.data[pos]->size();
++pos;
}
/// Insert from the parts of column in the middle of range.
while (pos < src.data.size() && processed_rows + src.data[pos]->size() < end)
{
insert_from_part(src.data[pos], 0, src.data[pos]->size());
processed_rows += src.data[pos]->size();
++pos;
}
/// Insert from the last part of column if needed.
if (pos < src.data.size() && processed_rows < end)
{
size_t part_end = end - processed_rows;
insert_from_part(src.data[pos], 0, part_end);
}
}
bool ColumnObject::Subcolumn::isFinalized() const
{
return data.empty() ||
(data.size() == 1 && !data[0]->isSparse() && num_of_defaults_in_prefix == 0);
return num_of_defaults_in_prefix == 0 &&
(data.empty() || (data.size() == 1 && !data[0]->isSparse()));
}
void ColumnObject::Subcolumn::finalize()
@ -432,6 +524,13 @@ void ColumnObject::Subcolumn::popBack(size_t n)
num_of_defaults_in_prefix -= n;
}
ColumnObject::Subcolumn ColumnObject::Subcolumn::cut(size_t start, size_t length) const
{
Subcolumn new_subcolumn(0, is_nullable);
new_subcolumn.insertRangeFrom(*this, start, length);
return new_subcolumn;
}
Field ColumnObject::Subcolumn::getLastField() const
{
if (data.empty())
@ -442,6 +541,18 @@ Field ColumnObject::Subcolumn::getLastField() const
return (*last_part)[last_part->size() - 1];
}
FieldInfo ColumnObject::Subcolumn::getFieldInfo() const
{
const auto & base_type = least_common_type.getBase();
return FieldInfo
{
.scalar_type = base_type,
.have_nulls = base_type->isNullable(),
.need_convert = false,
.num_dimensions = least_common_type.getNumberOfDimensions(),
};
}
ColumnObject::Subcolumn ColumnObject::Subcolumn::recreateWithDefaultValues(const FieldInfo & field_info) const
{
auto scalar_type = field_info.scalar_type;
@ -479,6 +590,13 @@ const ColumnPtr & ColumnObject::Subcolumn::getFinalizedColumnPtr() const
return data[0];
}
ColumnObject::Subcolumn::LeastCommonType::LeastCommonType()
: type(std::make_shared<DataTypeNothing>())
, base_type(type)
, num_dimensions(0)
{
}
ColumnObject::Subcolumn::LeastCommonType::LeastCommonType(DataTypePtr type_)
: type(std::move(type_))
, base_type(getBaseTypeOfArray(type))
@ -525,16 +643,6 @@ size_t ColumnObject::size() const
return num_rows;
}
MutableColumnPtr ColumnObject::cloneResized(size_t new_size) const
{
/// cloneResized with new_size == 0 is used for cloneEmpty().
if (new_size != 0)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"ColumnObject doesn't support resize to non-zero length");
return ColumnObject::create(is_nullable);
}
size_t ColumnObject::byteSize() const
{
size_t res = 0;
@ -553,23 +661,21 @@ size_t ColumnObject::allocatedBytes() const
void ColumnObject::forEachSubcolumn(ColumnCallback callback)
{
if (!isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot iterate over non-finalized ColumnObject");
for (auto & entry : subcolumns)
callback(entry->data.data.back());
for (auto & part : entry->data.data)
callback(part);
}
void ColumnObject::insert(const Field & field)
{
const auto & object = field.get<const Object &>();
HashSet<StringRef, StringRefHash> inserted;
HashSet<StringRef, StringRefHash> inserted_paths;
size_t old_size = size();
for (const auto & [key_str, value] : object)
{
PathInData key(key_str);
inserted.insert(key_str);
inserted_paths.insert(key_str);
if (!hasSubcolumn(key))
addSubcolumn(key, old_size);
@ -578,8 +684,14 @@ void ColumnObject::insert(const Field & field)
}
for (auto & entry : subcolumns)
if (!inserted.has(entry->path.getPath()))
entry->data.insertDefault();
{
if (!inserted_paths.has(entry->path.getPath()))
{
bool inserted = tryInsertDefaultFromNested(entry);
if (!inserted)
entry->data.insertDefault();
}
}
++num_rows;
}
@ -594,26 +706,21 @@ void ColumnObject::insertDefault()
Field ColumnObject::operator[](size_t n) const
{
if (!isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get Field from non-finalized ColumnObject");
Object object;
for (const auto & entry : subcolumns)
object[entry->path.getPath()] = (*entry->data.data.back())[n];
Field object;
get(n, object);
return object;
}
void ColumnObject::get(size_t n, Field & res) const
{
if (!isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get Field from non-finalized ColumnObject");
assert(n < size());
res = Object();
auto & object = res.get<Object &>();
for (const auto & entry : subcolumns)
{
auto it = object.try_emplace(entry->path.getPath()).first;
entry->data.data.back()->get(n, it->second);
entry->data.get(n, it->second);
}
}
@ -626,41 +733,28 @@ void ColumnObject::insertFrom(const IColumn & src, size_t n)
void ColumnObject::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{
const auto & src_object = assert_cast<const ColumnObject &>(src);
if (!src_object.isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insertRangeFrom non-finalized ColumnObject");
for (auto & entry : subcolumns)
{
if (src_object.hasSubcolumn(entry->path))
entry->data.insertRangeFrom(src_object.getSubcolumn(entry->path), start, length);
else
entry->data.insertManyDefaults(length);
}
for (const auto & entry : src_object.subcolumns)
{
if (!hasSubcolumn(entry->path))
{
if (entry->path.hasNested())
{
const auto & base_type = entry->data.getLeastCommonTypeBase();
FieldInfo field_info
{
.scalar_type = base_type,
.have_nulls = base_type->isNullable(),
.need_convert = false,
.num_dimensions = entry->data.getNumberOfDimensions(),
};
addNestedSubcolumn(entry->path, field_info, num_rows);
}
addNestedSubcolumn(entry->path, entry->data.getFieldInfo(), num_rows);
else
{
addSubcolumn(entry->path, num_rows);
}
}
auto & subcolumn = getSubcolumn(entry->path);
subcolumn.insertRangeFrom(entry->data, start, length);
auto & subcolumn = getSubcolumn(entry->path);
subcolumn.insertRangeFrom(entry->data, start, length);
}
for (auto & entry : subcolumns)
{
if (!src_object.hasSubcolumn(entry->path))
{
bool inserted = tryInsertManyDefaultsFromNested(entry);
if (!inserted)
entry->data.insertManyDefaults(length);
}
}
@ -668,21 +762,6 @@ void ColumnObject::insertRangeFrom(const IColumn & src, size_t start, size_t len
finalize();
}
ColumnPtr ColumnObject::replicate(const Offsets & offsets) const
{
if (!isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot replicate non-finalized ColumnObject");
auto res_column = ColumnObject::create(is_nullable);
for (const auto & entry : subcolumns)
{
auto replicated_data = entry->data.data.back()->replicate(offsets)->assumeMutable();
res_column->addSubcolumn(entry->path, std::move(replicated_data));
}
return res_column;
}
void ColumnObject::popBack(size_t length)
{
for (auto & entry : subcolumns)
@ -692,10 +771,15 @@ void ColumnObject::popBack(size_t length)
}
template <typename Func>
ColumnPtr ColumnObject::applyForSubcolumns(Func && func, std::string_view func_name) const
MutableColumnPtr ColumnObject::applyForSubcolumns(Func && func) const
{
if (!isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot {} non-finalized ColumnObject", func_name);
{
auto finalized = IColumn::mutate(getPtr());
auto & finalized_object = assert_cast<ColumnObject &>(*finalized);
finalized_object.finalize();
return finalized_object.applyForSubcolumns(std::forward<Func>(func));
}
auto res = ColumnObject::create(is_nullable);
for (const auto & subcolumn : subcolumns)
@ -703,22 +787,36 @@ ColumnPtr ColumnObject::applyForSubcolumns(Func && func, std::string_view func_n
auto new_subcolumn = func(subcolumn->data.getFinalizedColumn());
res->addSubcolumn(subcolumn->path, new_subcolumn->assumeMutable());
}
return res;
}
ColumnPtr ColumnObject::permute(const Permutation & perm, size_t limit) const
{
return applyForSubcolumns([&](const auto & subcolumn) { return subcolumn.permute(perm, limit); }, "permute");
return applyForSubcolumns([&](const auto & subcolumn) { return subcolumn.permute(perm, limit); });
}
ColumnPtr ColumnObject::filter(const Filter & filter, ssize_t result_size_hint) const
{
return applyForSubcolumns([&](const auto & subcolumn) { return subcolumn.filter(filter, result_size_hint); }, "filter");
return applyForSubcolumns([&](const auto & subcolumn) { return subcolumn.filter(filter, result_size_hint); });
}
ColumnPtr ColumnObject::index(const IColumn & indexes, size_t limit) const
{
return applyForSubcolumns([&](const auto & subcolumn) { return subcolumn.index(indexes, limit); }, "index");
return applyForSubcolumns([&](const auto & subcolumn) { return subcolumn.index(indexes, limit); });
}
ColumnPtr ColumnObject::replicate(const Offsets & offsets) const
{
return applyForSubcolumns([&](const auto & subcolumn) { return subcolumn.replicate(offsets); });
}
MutableColumnPtr ColumnObject::cloneResized(size_t new_size) const
{
if (new_size == 0)
return ColumnObject::create(is_nullable);
return applyForSubcolumns([&](const auto & subcolumn) { return subcolumn.cloneResized(new_size); });
}
const ColumnObject::Subcolumn & ColumnObject::getSubcolumn(const PathInData & key) const
@ -810,6 +908,92 @@ void ColumnObject::addNestedSubcolumn(const PathInData & key, const FieldInfo &
if (num_rows == 0)
num_rows = new_size;
else if (new_size != num_rows)
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH,
"Required size of subcolumn {} ({}) is inconsistent with column size ({})",
key.getPath(), new_size, num_rows);
}
const ColumnObject::Subcolumns::Node * ColumnObject::getLeafOfTheSameNested(const Subcolumns::NodePtr & entry) const
{
if (!entry->path.hasNested())
return nullptr;
size_t old_size = entry->data.size();
const auto * current_node = subcolumns.findLeaf(entry->path);
const Subcolumns::Node * leaf = nullptr;
while (current_node)
{
/// Try to find the first Nested up to the current node.
const auto * node_nested = subcolumns.findParent(current_node,
[](const auto & candidate) { return candidate.isNested(); });
if (!node_nested)
break;
/// Find the leaf with subcolumn that contains values
/// for the last rows.
/// If there are no leaves, skip current node and find
/// the next node up to the current.
leaf = subcolumns.findLeaf(node_nested,
[&](const auto & candidate)
{
return candidate.data.size() > old_size;
});
if (leaf)
break;
current_node = node_nested->parent;
}
if (leaf && isNothing(leaf->data.getLeastCommonTypeBase()))
return nullptr;
return leaf;
}
bool ColumnObject::tryInsertManyDefaultsFromNested(const Subcolumns::NodePtr & entry) const
{
const auto * leaf = getLeafOfTheSameNested(entry);
if (!leaf)
return false;
size_t old_size = entry->data.size();
auto field_info = entry->data.getFieldInfo();
/// Cut the needed range from the found leaf
/// and replace scalar values to the correct
/// default values for given entry.
auto new_subcolumn = leaf->data
.cut(old_size, leaf->data.size() - old_size)
.recreateWithDefaultValues(field_info);
entry->data.insertRangeFrom(new_subcolumn, 0, new_subcolumn.size());
return true;
}
bool ColumnObject::tryInsertDefaultFromNested(const Subcolumns::NodePtr & entry) const
{
const auto * leaf = getLeafOfTheSameNested(entry);
if (!leaf)
return false;
auto last_field = leaf->data.getLastField();
if (last_field.isNull())
return false;
size_t leaf_num_dimensions = leaf->data.getNumberOfDimensions();
size_t entry_num_dimensions = entry->data.getNumberOfDimensions();
auto default_scalar = entry_num_dimensions > leaf_num_dimensions
? createEmptyArrayField(entry_num_dimensions - leaf_num_dimensions)
: entry->data.getLeastCommonTypeBase()->getDefault();
auto default_field = applyVisitor(FieldVisitorReplaceScalars(default_scalar, leaf_num_dimensions), last_field);
entry->data.insert(std::move(default_field));
return true;
}
PathsInData ColumnObject::getKeys() const
@ -835,7 +1019,7 @@ void ColumnObject::finalize()
{
const auto & least_common_type = entry->data.getLeastCommonType();
/// Do not add subcolumns, which consists only from NULLs.
/// Do not add subcolumns, which consist only from NULLs.
if (isNothing(getBaseTypeOfArray(least_common_type)))
continue;

View File

@ -65,6 +65,7 @@ public:
size_t size() const;
size_t byteSize() const;
size_t allocatedBytes() const;
void get(size_t n, Field & res) const;
bool isFinalized() const;
const DataTypePtr & getLeastCommonType() const { return least_common_type.get(); }
@ -84,6 +85,8 @@ public:
void insertRangeFrom(const Subcolumn & src, size_t start, size_t length);
void popBack(size_t n);
Subcolumn cut(size_t start, size_t length) const;
/// Converts all column's parts to the common type and
/// creates a single column that stores all values.
void finalize();
@ -91,6 +94,8 @@ public:
/// Returns last inserted field.
Field getLastField() const;
FieldInfo getFieldInfo() const;
/// Recreates subcolumn with default scalar values and keeps sizes of arrays.
/// Used to create columns of type Nested with consistent array sizes.
Subcolumn recreateWithDefaultValues(const FieldInfo & field_info) const;
@ -101,13 +106,16 @@ public:
const IColumn & getFinalizedColumn() const;
const ColumnPtr & getFinalizedColumnPtr() const;
const std::vector<WrappedPtr> & getData() const { return data; }
size_t getNumberOfDefaultsInPrefix() const { return num_of_defaults_in_prefix; }
friend class ColumnObject;
private:
class LeastCommonType
{
public:
LeastCommonType() = default;
LeastCommonType();
explicit LeastCommonType(DataTypePtr type_);
const DataTypePtr & get() const { return type; }
@ -175,6 +183,11 @@ public:
/// It cares about consistency of sizes of Nested arrays.
void addNestedSubcolumn(const PathInData & key, const FieldInfo & field_info, size_t new_size);
/// Finds a subcolumn from the same Nested type as @entry and inserts
/// an array with default values with consistent sizes as in Nested type.
bool tryInsertDefaultFromNested(const Subcolumns::NodePtr & entry) const;
bool tryInsertManyDefaultsFromNested(const Subcolumns::NodePtr & entry) const;
const Subcolumns & getSubcolumns() const { return subcolumns; }
Subcolumns & getSubcolumns() { return subcolumns; }
PathsInData getKeys() const;
@ -189,7 +202,6 @@ public:
TypeIndex getDataType() const override { return TypeIndex::Object; }
size_t size() const override;
MutableColumnPtr cloneResized(size_t new_size) const override;
size_t byteSize() const override;
size_t allocatedBytes() const override;
void forEachSubcolumn(ColumnCallback callback) override;
@ -197,13 +209,14 @@ public:
void insertDefault() override;
void insertFrom(const IColumn & src, size_t n) override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
ColumnPtr replicate(const Offsets & offsets) const override;
void popBack(size_t length) override;
Field operator[](size_t n) const override;
void get(size_t n, Field & res) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumnPtr cloneResized(size_t new_size) const override;
/// All other methods throw exception.
@ -236,7 +249,11 @@ private:
}
template <typename Func>
ColumnPtr applyForSubcolumns(Func && func, std::string_view func_name) const;
MutableColumnPtr applyForSubcolumns(Func && func) const;
/// For given subcolumn return subcolumn from the same Nested type.
/// It's used to get shared sized of Nested to insert correct default values.
const Subcolumns::Node * getLeafOfTheSameNested(const Subcolumns::NodePtr & entry) const;
};
}

View File

@ -0,0 +1,120 @@
#include <Common/FieldVisitorsAccurateComparison.h>
#include <DataTypes/getLeastSupertype.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/convertFieldToType.h>
#include <Columns/ColumnObject.h>
#include <Common/FieldVisitorToString.h>
#include <Common/randomSeed.h>
#include <fmt/core.h>
#include <pcg_random.hpp>
#include <gtest/gtest.h>
#include <random>
using namespace DB;
static pcg64 rng(randomSeed());
Field getRandomField(size_t type)
{
switch (type)
{
case 0:
return rng();
case 1:
return std::uniform_real_distribution<>(0.0, 1.0)(rng);
case 2:
return std::string(rng() % 10, 'a' + rng() % 26);
default:
return Field();
}
}
std::pair<ColumnObject::Subcolumn, std::vector<Field>> generate(size_t size)
{
bool has_defaults = rng() % 3 == 0;
size_t num_defaults = has_defaults ? rng() % size : 0;
ColumnObject::Subcolumn subcolumn(num_defaults, false);
std::vector<Field> fields;
while (subcolumn.size() < size)
{
size_t part_size = rng() % (size - subcolumn.size()) + 1;
size_t field_type = rng() % 3;
for (size_t i = 0; i < part_size; ++i)
{
fields.push_back(getRandomField(field_type));
subcolumn.insert(fields.back());
}
}
std::vector<Field> result_fields;
for (size_t i = 0; i < num_defaults; ++i)
result_fields.emplace_back();
result_fields.insert(result_fields.end(), fields.begin(), fields.end());
return {std::move(subcolumn), std::move(result_fields)};
}
void checkFieldsAreEqual(ColumnObject::Subcolumn subcolumn, const std::vector<Field> & fields)
{
ASSERT_EQ(subcolumn.size(), fields.size());
for (size_t i = 0; i < subcolumn.size(); ++i)
{
Field field;
subcolumn.get(i, field); // Also check 'get' method.
if (!applyVisitor(FieldVisitorAccurateEquals(), field, fields[i]))
{
std::cerr << fmt::format("Wrong value at position {}, expected {}, got {}",
i, applyVisitor(FieldVisitorToString(), fields[i]), applyVisitor(FieldVisitorToString(), field));
ASSERT_TRUE(false);
}
}
}
constexpr size_t T = 1000;
constexpr size_t N = 1000;
TEST(ColumnObject, InsertRangeFrom)
{
for (size_t t = 0; t < T; ++t)
{
auto [subcolumn_dst, fields_dst] = generate(N);
auto [subcolumn_src, fields_src] = generate(N);
ASSERT_EQ(subcolumn_dst.size(), fields_dst.size());
ASSERT_EQ(subcolumn_src.size(), fields_src.size());
const auto & type_dst = subcolumn_dst.getLeastCommonType();
const auto & type_src = subcolumn_src.getLeastCommonType();
auto type_res = getLeastSupertype(DataTypes{type_dst, type_src}, true);
size_t from = rng() % subcolumn_src.size();
size_t to = rng() % subcolumn_src.size();
if (from > to)
std::swap(from, to);
++to;
for (auto & field : fields_dst)
{
if (field.isNull())
field = type_res->getDefault();
else
field = convertFieldToTypeOrThrow(field, *type_res);
}
for (size_t i = from; i < to; ++i)
{
if (fields_src[i].isNull())
fields_dst.push_back(type_res->getDefault());
else
fields_dst.push_back(convertFieldToTypeOrThrow(fields_src[i], *type_res));
}
subcolumn_dst.insertRangeFrom(subcolumn_src, from, to - from);
checkFieldsAreEqual(subcolumn_dst, fields_dst);
}
}

View File

@ -11,7 +11,7 @@
#include <Common/FieldVisitors.h>
using namespace DB;
pcg64 rng(randomSeed());
static pcg64 rng(randomSeed());
std::pair<MutableColumnPtr, MutableColumnPtr> createColumns(size_t n, size_t k)
{

View File

@ -554,54 +554,19 @@ FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset,
bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
auto query_context = enable_filesystem_query_cache_limit ? getCurrentQueryContext(cache_lock) : nullptr;
if (!query_context)
return tryReserveForMainList(key, offset, size, nullptr, cache_lock);
/// If the context can be found, subsequent cache replacements are made through the Query context.
if (query_context)
{
auto res = tryReserveForQuery(key, offset, size, query_context, cache_lock);
switch (res)
{
case ReserveResult::FITS_IN_QUERY_LIMIT_AND_RESERVATION_COMPLETED :
{
/// When the maximum cache size of the query is reached, the cache will be
/// evicted from the history cache accessed by the current query.
return true;
}
case ReserveResult::EXCEEDS_QUERY_LIMIT :
{
/// The query currently does not have enough space to reserve.
/// It returns false and reads data directly from the remote fs.
return false;
}
case ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST :
{
/// When the maximum cache capacity of the request is not reached, the cache
/// block is evicted from the main LRU queue.
return tryReserveForMainList(key, offset, size, query_context, cache_lock);
}
}
__builtin_unreachable();
}
else
{
return tryReserveForMainList(key, offset, size, query_context, cache_lock);
}
}
LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock)
{
/// The maximum cache capacity of the request is not reached, thus the
//// cache block is evicted from the main LRU queue by tryReserveForMainList().
if (query_context->getCacheSize() + size <= query_context->getMaxCacheSize())
{
return ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST;
}
else if (query_context->getCacheSize() + size <= query_context->getMaxCacheSize())
return tryReserveForMainList(key, offset, size, query_context, cache_lock);
/// When skip_download_if_exceeds_query_cache is true, there is no need
/// to evict old data, skip the cache and read directly from remote fs.
else if (query_context->isSkipDownloadIfExceed())
{
return ReserveResult::EXCEEDS_QUERY_LIMIT;
}
return false;
/// The maximum cache size of the query is reached, the cache will be
/// evicted from the history cache accessed by the current query.
else
@ -617,7 +582,7 @@ LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, si
auto is_overflow = [&]
{
return (max_size != 0 && queue.getTotalWeight(cache_lock) + size - removed_size > max_size)
return (max_size != 0 && queue.getTotalCacheSize(cache_lock) + size - removed_size > max_size)
|| (max_element_size != 0 && queue_size > max_element_size)
|| (query_context->getCacheSize() + size - removed_size > query_context->getMaxCacheSize());
};
@ -666,26 +631,26 @@ LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, si
}
}
auto remove_file_segment = [&](FileSegmentPtr file_segment, size_t file_segment_size)
{
query_context->remove(file_segment->key(), file_segment->offset(), file_segment_size, cache_lock);
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
};
assert(trash.empty());
for (auto & cell : trash)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
query_context->remove(file_segment->key(), file_segment->offset(), cell->size(), cache_lock);
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
if (auto file_segment = cell->file_segment)
remove_file_segment(file_segment, cell->size());
}
for (auto & iter : ghost)
query_context->remove(iter->key, iter->offset, iter->size, cache_lock);
if (is_overflow())
{
return ReserveResult::EXCEEDS_QUERY_LIMIT;
}
return false;
if (cell_for_reserve)
{
@ -698,18 +663,12 @@ LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, si
for (auto & cell : to_evict)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
query_context->remove(file_segment->key(), file_segment->offset(), cell->size(), cache_lock);
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
if (auto file_segment = cell->file_segment)
remove_file_segment(file_segment, cell->size());
}
query_context->reserve(key, offset, size, cache_lock);
return ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST;
return true;
}
}
@ -732,7 +691,7 @@ bool LRUFileCache::tryReserveForMainList(
auto is_overflow = [&]
{
/// max_size == 0 means unlimited cache size, max_element_size means unlimited number of cache elements.
return (max_size != 0 && queue.getTotalWeight(cache_lock) + size - removed_size > max_size)
return (max_size != 0 && queue.getTotalCacheSize(cache_lock) + size - removed_size > max_size)
|| (max_element_size != 0 && queue_size > max_element_size);
};
@ -785,18 +744,19 @@ bool LRUFileCache::tryReserveForMainList(
}
}
auto remove_file_segment = [&](FileSegmentPtr file_segment)
{
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
};
/// This case is very unlikely, can happen in case of exception from
/// file_segment->complete(), which would be a logical error.
assert(trash.empty());
for (auto & cell : trash)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
if (auto file_segment = cell->file_segment)
remove_file_segment(file_segment);
}
if (is_overflow())
@ -817,15 +777,11 @@ bool LRUFileCache::tryReserveForMainList(
for (auto & cell : to_evict)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
if (auto file_segment = cell->file_segment)
remove_file_segment(file_segment);
}
if (queue.getTotalWeight(cache_lock) > (1ull << 63))
if (queue.getTotalCacheSize(cache_lock) > (1ull << 63))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
if (query_context)
@ -1116,7 +1072,7 @@ size_t LRUFileCache::getUsedCacheSize() const
size_t LRUFileCache::getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
{
return queue.getTotalWeight(cache_lock);
return queue.getTotalCacheSize(cache_lock);
}
size_t LRUFileCache::getAvailableCacheSize() const
@ -1305,8 +1261,8 @@ void LRUFileCache::assertQueueCorrectness(std::lock_guard<std::mutex> & cache_lo
total_size += size;
}
assert(total_size == queue.getTotalWeight(cache_lock));
assert(queue.getTotalWeight(cache_lock) <= max_size);
assert(total_size == queue.getTotalCacheSize(cache_lock));
assert(queue.getTotalCacheSize(cache_lock) <= max_size);
assert(queue.getElementsNum(cache_lock) <= max_element_size);
}

View File

@ -130,7 +130,7 @@ protected:
using Iterator = typename std::list<FileKeyAndOffset>::iterator;
size_t getTotalWeight(std::lock_guard<std::mutex> & /* cache_lock */) const { return cache_size; }
size_t getTotalCacheSize(std::lock_guard<std::mutex> & /* cache_lock */) const { return cache_size; }
size_t getElementsNum(std::lock_guard<std::mutex> & /* cache_lock */) const { return queue.size(); }
@ -356,13 +356,6 @@ private:
size_t max_stash_element_size;
size_t enable_cache_hits_threshold;
enum class ReserveResult
{
FITS_IN_QUERY_LIMIT_AND_RESERVATION_COMPLETED,
EXCEEDS_QUERY_LIMIT,
FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST,
};
Poco::Logger * log;
FileSegments getImpl(
@ -387,12 +380,6 @@ private:
QueryContextPtr query_context,
std::lock_guard<std::mutex> & cache_lock);
/// Limit the maximum cache size for current query.
LRUFileCache::ReserveResult tryReserveForQuery(
const Key & key, size_t offset, size_t size,
QueryContextPtr query_context,
std::lock_guard<std::mutex> & cache_lock);
void remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,

View File

@ -9,13 +9,13 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}
Int32 IntervalKind::toAvgSeconds() const
Float64 IntervalKind::toAvgSeconds() const
{
switch (kind)
{
case IntervalKind::Nanosecond:
case IntervalKind::Microsecond:
case IntervalKind::Millisecond: return 0; /// fractional parts of seconds have 0 seconds
case IntervalKind::Nanosecond: return 0.000000001;
case IntervalKind::Microsecond: return 0.000001;
case IntervalKind::Millisecond: return 0.001;
case IntervalKind::Second: return 1;
case IntervalKind::Minute: return 60;
case IntervalKind::Hour: return 3600;
@ -28,6 +28,25 @@ Int32 IntervalKind::toAvgSeconds() const
__builtin_unreachable();
}
bool IntervalKind::isFixedLength() const
{
switch (kind)
{
case IntervalKind::Nanosecond:
case IntervalKind::Microsecond:
case IntervalKind::Millisecond:
case IntervalKind::Second:
case IntervalKind::Minute:
case IntervalKind::Hour:
case IntervalKind::Day:
case IntervalKind::Week: return true;
case IntervalKind::Month:
case IntervalKind::Quarter:
case IntervalKind::Year: return false;
}
__builtin_unreachable();
}
IntervalKind IntervalKind::fromAvgSeconds(Int64 num_seconds)
{
if (num_seconds)

View File

@ -31,12 +31,15 @@ struct IntervalKind
/// Returns number of seconds in one interval.
/// For `Month`, `Quarter` and `Year` the function returns an average number of seconds.
Int32 toAvgSeconds() const;
Float64 toAvgSeconds() const;
/// Chooses an interval kind based on number of seconds.
/// For example, `IntervalKind::fromAvgSeconds(3600)` returns `IntervalKind::Hour`.
static IntervalKind fromAvgSeconds(Int64 num_seconds);
/// Returns whether IntervalKind has a fixed number of seconds (e.g. Day) or non-fixed(e.g. Month)
bool isFixedLength() const;
/// Returns an uppercased version of what `toString()` returns.
const char * toKeyword() const;

View File

@ -17,6 +17,11 @@ namespace std
using namespace experimental::coroutines_v1;
}
#if __has_warning("-Wdeprecated-experimental-coroutine")
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-experimental-coroutine"
#endif
#else
#include <coroutine>
#pragma GCC diagnostic push

View File

@ -718,9 +718,9 @@ void replaceMissedSubcolumnsByConstants(
addConstantToWithClause(query, name, type);
}
void finalizeObjectColumns(MutableColumns & columns)
void finalizeObjectColumns(const MutableColumns & columns)
{
for (auto & column : columns)
for (const auto & column : columns)
if (auto * column_object = typeid_cast<ColumnObject *>(column.get()))
column_object->finalize();
}

View File

@ -51,7 +51,7 @@ void extendObjectColumns(NamesAndTypesList & columns_list, const ColumnsDescript
NameSet getNamesOfObjectColumns(const NamesAndTypesList & columns_list);
bool hasObjectColumns(const ColumnsDescription & columns);
void finalizeObjectColumns(MutableColumns & columns);
void finalizeObjectColumns(const MutableColumns & columns);
/// Updates types of objects in @object_columns inplace
/// according to types in new_columns.

View File

@ -33,71 +33,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
using Node = typename ColumnObject::Subcolumns::Node;
/// Finds a subcolumn from the same Nested type as @entry and inserts
/// an array with default values with consistent sizes as in Nested type.
bool tryInsertDefaultFromNested(
const std::shared_ptr<Node> & entry, const ColumnObject::Subcolumns & subcolumns)
{
if (!entry->path.hasNested())
return false;
const Node * current_node = subcolumns.findLeaf(entry->path);
const Node * leaf = nullptr;
size_t num_skipped_nested = 0;
while (current_node)
{
/// Try to find the first Nested up to the current node.
const auto * node_nested = subcolumns.findParent(current_node,
[](const auto & candidate) { return candidate.isNested(); });
if (!node_nested)
break;
/// If there are no leaves, skip current node and find
/// the next node up to the current.
leaf = subcolumns.findLeaf(node_nested,
[&](const auto & candidate)
{
return candidate.data.size() == entry->data.size() + 1;
});
if (leaf)
break;
current_node = node_nested->parent;
++num_skipped_nested;
}
if (!leaf)
return false;
auto last_field = leaf->data.getLastField();
if (last_field.isNull())
return false;
const auto & least_common_type = entry->data.getLeastCommonType();
size_t num_dimensions = getNumberOfDimensions(*least_common_type);
assert(num_skipped_nested < num_dimensions);
/// Replace scalars to default values with consistent array sizes.
size_t num_dimensions_to_keep = num_dimensions - num_skipped_nested;
auto default_scalar = num_skipped_nested
? createEmptyArrayField(num_skipped_nested)
: getBaseTypeOfArray(least_common_type)->getDefault();
auto default_field = applyVisitor(FieldVisitorReplaceScalars(default_scalar, num_dimensions_to_keep), last_field);
entry->data.insert(std::move(default_field));
return true;
}
}
template <typename Parser>
template <typename Reader>
void SerializationObject<Parser>::deserializeTextImpl(IColumn & column, Reader && reader) const
@ -159,7 +94,7 @@ void SerializationObject<Parser>::deserializeTextImpl(IColumn & column, Reader &
{
if (!paths_set.has(entry->path.getPath()))
{
bool inserted = tryInsertDefaultFromNested(entry, subcolumns);
bool inserted = column_object.tryInsertDefaultFromNested(entry);
if (!inserted)
entry->data.insertDefault();
}

View File

@ -0,0 +1,15 @@
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
namespace DB
{
S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
return S3Capabilities
{
.support_batch_delete = config.getBool(config_prefix + ".support_batch_delete", true),
.support_proxy = config.getBool(config_prefix + ".support_proxy", config.has(config_prefix + ".proxy")),
};
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <string>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
/// Supported/unsupported features by different S3 implementations
/// Can be useful only for almost compatible with AWS S3 versions.
struct S3Capabilities
{
/// Google S3 implementation doesn't support batch delete
/// TODO: possibly we have to use Google SDK https://github.com/googleapis/google-cloud-cpp/tree/main/google/cloud/storage
/// because looks like it miss a lot of features like:
/// 1) batch delete
/// 2) list_v2
/// 3) multipart upload works differently
bool support_batch_delete{true};
/// Y.Cloud S3 implementation support proxy for connection
bool support_proxy{false};
};
S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
}

View File

@ -17,6 +17,7 @@
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
@ -213,18 +214,34 @@ void S3ObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & chi
void S3ObjectStorage::removeObject(const std::string & path)
{
auto client_ptr = client.get();
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(path);
auto settings_ptr = s3_settings.get();
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects({obj});
// If chunk size is 0, only use single delete request
// This allows us to work with GCS, which doesn't support DeleteObjects
if (!s3_capabilities.support_batch_delete)
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(path);
auto outcome = client_ptr->DeleteObject(request);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome);
}
else
{
/// TODO: For AWS we prefer to use multiobject operation even for single object
/// maybe we shouldn't?
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(path);
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects({obj});
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome);
throwIfError(outcome);
}
}
void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
@ -235,31 +252,39 @@ void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
auto client_ptr = client.get();
auto settings_ptr = s3_settings.get();
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
size_t current_position = 0;
while (current_position < paths.size())
if (!s3_capabilities.support_batch_delete)
{
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
String keys;
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
for (const auto & path : paths)
removeObject(path);
}
else
{
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
size_t current_position = 0;
while (current_position < paths.size())
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(paths[current_position]);
current_chunk.push_back(obj);
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
String keys;
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(paths[current_position]);
current_chunk.push_back(obj);
if (!keys.empty())
keys += ", ";
keys += paths[current_position];
if (!keys.empty())
keys += ", ";
keys += paths[current_position];
}
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(current_chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome);
}
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(current_chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome);
}
}
@ -493,7 +518,7 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(const std::s
return std::make_unique<S3ObjectStorage>(
nullptr, getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
version_id, new_namespace);
version_id, s3_capabilities, new_namespace);
}
}

View File

@ -5,6 +5,7 @@
#if USE_AWS_S3
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
#include <memory>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/HeadObjectResult.h>
@ -46,11 +47,13 @@ public:
std::unique_ptr<Aws::S3::S3Client> && client_,
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
String version_id_,
const S3Capabilities & s3_capabilities_,
String bucket_)
: IObjectStorage(std::move(cache_))
, bucket(bucket_)
, client(std::move(client_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_))
{}
@ -129,6 +132,7 @@ private:
MultiVersion<Aws::S3::S3Client> client;
MultiVersion<S3ObjectStorageSettings> s3_settings;
const S3Capabilities s3_capabilities;
const String version_id;
};

View File

@ -89,11 +89,12 @@ void registerDiskS3(DiskFactory & factory)
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context);
S3Capabilities s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
ObjectStoragePtr s3_storage = std::make_unique<S3ObjectStorage>(
std::move(cache), getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
uri.version_id, uri.bucket);
uri.version_id, s3_capabilities, uri.bucket);
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);

View File

@ -13,6 +13,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeInterval.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h>
@ -27,6 +28,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
// Interface for true window functions. It's not much of an interface, they just
@ -2200,6 +2202,109 @@ struct WindowFunctionNthValue final : public WindowFunction
}
};
struct NonNegativeDerivativeState
{
Float64 previous_metric = 0;
Float64 previous_timestamp = 0;
};
// nonNegativeDerivative(metric_column, timestamp_column[, INTERVAL 1 SECOND])
struct WindowFunctionNonNegativeDerivative final : public StatefulWindowFunction<NonNegativeDerivativeState>
{
static constexpr size_t ARGUMENT_METRIC = 0;
static constexpr size_t ARGUMENT_TIMESTAMP = 1;
static constexpr size_t ARGUMENT_INTERVAL = 2;
WindowFunctionNonNegativeDerivative(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_)
: StatefulWindowFunction(name_, argument_types_, parameters_)
{
if (!parameters.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} cannot be parameterized", name_);
}
if (argument_types.size() != 2 && argument_types.size() != 3)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} takes 2 or 3 arguments", name_);
}
if (!isNumber(argument_types[ARGUMENT_METRIC]))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Argument {} must be a number, '{}' given",
ARGUMENT_METRIC,
argument_types[ARGUMENT_METRIC]->getName());
}
if (!isDateTime(argument_types[ARGUMENT_TIMESTAMP]) && !isDateTime64(argument_types[ARGUMENT_TIMESTAMP]))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Argument {} must be DateTime or DateTime64, '{}' given",
ARGUMENT_TIMESTAMP,
argument_types[ARGUMENT_TIMESTAMP]->getName());
}
if (argument_types.size() == 3)
{
const DataTypeInterval * interval_datatype = checkAndGetDataType<DataTypeInterval>(argument_types[ARGUMENT_INTERVAL].get());
if (!interval_datatype)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Argument {} must be an INTERVAL, '{}' given",
ARGUMENT_INTERVAL,
argument_types[ARGUMENT_INTERVAL]->getName());
}
if (!interval_datatype->getKind().isFixedLength())
{
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The INTERVAL must be a week or shorter, '{}' given",
argument_types[ARGUMENT_INTERVAL]->getName());
}
interval_length = interval_datatype->getKind().toAvgSeconds();
interval_specified = true;
}
}
DataTypePtr getReturnType() const override { return argument_types[0]; }
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{
const auto & current_block = transform->blockAt(transform->current_row);
const auto & workspace = transform->workspaces[function_index];
auto & state = getState(workspace);
auto interval_duration = interval_specified ? interval_length *
(*current_block.input_columns[workspace.argument_column_indices[ARGUMENT_INTERVAL]]).getFloat64(0) : 1;
Float64 last_metric = state.previous_metric;
Float64 last_timestamp = state.previous_timestamp;
Float64 curr_metric = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_METRIC, transform->current_row);
Float64 curr_timestamp = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIMESTAMP, transform->current_row);
Float64 time_elapsed = curr_timestamp - last_timestamp;
Float64 metric_diff = curr_metric - last_metric;
Float64 result = (time_elapsed != 0) ? (metric_diff / time_elapsed * interval_duration) : 0;
state.previous_metric = curr_metric;
state.previous_timestamp = curr_timestamp;
WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, result >= 0 ? result : 0);
}
private:
Float64 interval_length = 1;
bool interval_specified = false;
};
void registerWindowFunctions(AggregateFunctionFactory & factory)
{
@ -2299,6 +2404,13 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
return std::make_shared<WindowFunctionExponentialTimeDecayedAvg>(
name, argument_types, parameters);
}, properties});
factory.registerFunction("nonNegativeDerivative", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
{
return std::make_shared<WindowFunctionNonNegativeDerivative>(
name, argument_types, parameters);
}, properties});
}
}

View File

@ -13,3 +13,6 @@ Tuple(arr Nested(k11 Int8, k22 String, k33 Int8), k1 Int8, k2 String, k3 String)
{"data":{"k1":1,"k10":[{"a":"1","b":"2","c":{"k11":""}},{"a":"2","b":"3","c":{"k11":""}}]}}
{"data":{"k1":2,"k10":[{"a":"1","b":"2","c":{"k11":"haha"}}]}}
Tuple(k1 Int8, k10 Nested(a String, b String, c Tuple(k11 String)))
{"data":{"k1":1,"k10":[{"a":"1","b":"2","c":{"k11":""}},{"a":"2","b":"3","c":{"k11":""}}]}}
{"data":{"k1":2,"k10":[{"a":"1","b":"2","c":{"k11":"haha"}}]}}
Tuple(k1 Int8, k10 Nested(a String, b String, c Tuple(k11 String)))

View File

@ -38,18 +38,29 @@ DROP TABLE type_json_dst;
CREATE TABLE type_json_dst (data JSON) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE type_json_src (data String) ENGINE = MergeTree ORDER BY tuple();
SYSTEM STOP MERGES type_json_src;
SET max_threads = 1;
SET max_insert_threads = 1;
SET output_format_json_named_tuples_as_objects = 1;
INSERT INTO type_json_src FORMAT JSONAsString {"k1": 1, "k10": [{"a": "1", "b": "2"}, {"a": "2", "b": "3"}]};
INSERT INTO type_json_src FORMAT JSONAsString {"k1": 2, "k10": [{"a": "1", "b": "2", "c": {"k11": "haha"}}]};
-- Temporarily fix test by optimizing data to one part.
-- If order of insertion of above two lines will be changed,
-- which can happen during insertion with multiple threads,
-- this test will fail. TODO: fix this.
OPTIMIZE TABLE type_json_src FINAL;
INSERT INTO type_json_dst SELECT data FROM type_json_src;
SELECT * FROM type_json_dst ORDER BY data.k1 FORMAT JSONEachRow;
SELECT toTypeName(data) FROM type_json_dst LIMIT 1;
TRUNCATE TABLE type_json_src;
TRUNCATE TABLE type_json_dst;
-- Insert in another order. Order is important, because a way how defaults are filled differs.
INSERT INTO type_json_src FORMAT JSONAsString {"k1": 2, "k10": [{"a": "1", "b": "2", "c": {"k11": "haha"}}]};
INSERT INTO type_json_src FORMAT JSONAsString {"k1": 1, "k10": [{"a": "1", "b": "2"}, {"a": "2", "b": "3"}]};
INSERT INTO type_json_dst SELECT data FROM type_json_src;
SET output_format_json_named_tuples_as_objects = 1;
SELECT * FROM type_json_dst ORDER BY data.k1 FORMAT JSONEachRow;
SELECT toTypeName(data) FROM type_json_dst LIMIT 1;

View File

@ -0,0 +1,64 @@
1
1979-12-12 21:21:21.123 1.1 3.5045052519931732e-9
1979-12-12 21:21:22.000 1.3345 0.26738883339230357
1979-12-12 21:21:23.000 1.54 0.20550000000000002
1979-12-12 21:21:23.000 1.54 0
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:21.124 2.34 0
1979-12-12 21:21:21.127 3.7 453.33916989529325
1979-12-12 21:21:21.123 1.1 1.0513515755979521e-17
1979-12-12 21:21:22.000 1.3345 8.021665001769108e-10
1979-12-12 21:21:23.000 1.54 6.165000000000001e-10
1979-12-12 21:21:23.000 1.54 0
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:21.124 2.34 0
1979-12-12 21:21:21.127 3.7 0.0000013600175096858798
1979-12-12 21:21:21.123 1.1 1.4018021007972692e-14
1979-12-12 21:21:22.000 1.3345 0.0000010695553335692141
1979-12-12 21:21:23.000 1.54 8.22e-7
1979-12-12 21:21:23.000 1.54 0
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:21.124 2.34 0
1979-12-12 21:21:21.127 3.7 0.001813356679581173
1979-12-12 21:21:21.123 1.1 1.7522526259965866e-11
1979-12-12 21:21:22.000 1.3345 0.0013369441669615178
1979-12-12 21:21:23.000 1.54 0.0010275000000000002
1979-12-12 21:21:23.000 1.54 0
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:21.124 2.34 0
1979-12-12 21:21:21.127 3.7 2.2666958494764664
1979-12-12 21:21:21.123 1.1 2.102703151195904e-8
1979-12-12 21:21:22.000 1.3345 1.6043330003538214
1979-12-12 21:21:23.000 1.54 1.233
1979-12-12 21:21:23.000 1.54 0
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:21.124 2.34 0
1979-12-12 21:21:21.127 3.7 2720.0350193717595
1979-12-12 21:21:21.123 1.1 0.0000014718922058371327
1979-12-12 21:21:22.000 1.3345 112.3033100247675
1979-12-12 21:21:23.000 1.54 86.31
1979-12-12 21:21:23.000 1.54 0
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:21.124 2.34 0
1979-12-12 21:21:21.127 3.7 190402.45135602317
1979-12-12 21:21:21.123 1.1 0.0001009297512574034
1979-12-12 21:21:21.124 2.34 35712459.78375156
1979-12-12 21:21:21.127 3.7 13056168.092984445
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:22.000 1.3345 0
1979-12-12 21:21:23.000 1.54 5918.400000000001
1979-12-12 21:21:23.000 1.54 0
1979-12-12 21:21:21.123 1.1 0.0027251032839498914
1979-12-12 21:21:21.124 2.34 964236414.1612921
1979-12-12 21:21:21.127 3.7 352516538.51058006
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:22.000 1.3345 0
1979-12-12 21:21:23.000 1.54 159796.80000000002
1979-12-12 21:21:23.000 1.54 0
1979-12-12 21:21:21.123 1.1 0.021195247764054712
1979-12-12 21:21:21.124 2.34 7499616554.587828
1979-12-12 21:21:21.127 3.7 2741795299.5267334
1979-12-12 21:21:21.129 2.1 0
1979-12-12 21:21:22.000 1.3345 0
1979-12-12 21:21:23.000 1.54 1242864
1979-12-12 21:21:23.000 1.54 0

View File

@ -0,0 +1,63 @@
DROP TABLE IF EXISTS nnd;
CREATE TABLE nnd
(
id Int8, ts DateTime64(3, 'UTC'), metric Float64
)
ENGINE=MergeTree()
ORDER BY id;
INSERT INTO nnd VALUES (1, toDateTime64('1979-12-12 21:21:21.123', 3, 'UTC'), 1.1), (2, toDateTime64('1979-12-12 21:21:21.124', 3, 'UTC'), 2.34), (3, toDateTime64('1979-12-12 21:21:21.127', 3, 'UTC'), 3.7);
INSERT INTO nnd VALUES (4, toDateTime64('1979-12-12 21:21:21.129', 3, 'UTC'), 2.1), (5, toDateTime('1979-12-12 21:21:22', 'UTC'), 1.3345), (6, toDateTime('1979-12-12 21:21:23', 'UTC'), 1.54), (7, toDateTime('1979-12-12 21:21:23', 'UTC'), 1.54);
-- shall work for precise intervals
-- INTERVAL 1 SECOND shall be default
SELECT (
SELECT
ts,
metric,
nonNegativeDerivative(metric, ts) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv
FROM nnd
LIMIT 5, 1
) = (
SELECT
ts,
metric,
nonNegativeDerivative(metric, ts, toIntervalSecond(1)) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv
FROM nnd
LIMIT 5, 1
);
SELECT ts, metric, nonNegativeDerivative(metric, ts) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- Nanosecond
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 3 NANOSECOND) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- Microsecond
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 4 MICROSECOND) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- Millisecond
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 5 MILLISECOND) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- Second
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 6 SECOND) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- Minute
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 7 MINUTE) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- Hour
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 8 HOUR) OVER (ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- Day
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 9 DAY) OVER (ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- Week
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 10 WEEK) OVER (ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd;
-- shall not work for month, quarter, year (intervals with floating number of seconds)
-- Month
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 11 MONTH) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd; -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
-- Quarter
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 12 QUARTER) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd; -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
-- Year
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 13 YEAR) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd; -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
-- test against wrong arguments/types
SELECT ts, metric, nonNegativeDerivative(metric, 1, INTERVAL 3 NANOSECOND) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd; -- { serverError BAD_ARGUMENTS }
SELECT ts, metric, nonNegativeDerivative('string not datetime', ts, INTERVAL 3 NANOSECOND) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd; -- { serverError BAD_ARGUMENTS }
SELECT ts, metric, nonNegativeDerivative(metric, ts, INTERVAL 3 NANOSECOND, id) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd; -- { serverError BAD_ARGUMENTS }
SELECT ts, metric, nonNegativeDerivative(metric) OVER (PARTITION BY metric ORDER BY ts ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS deriv FROM nnd; -- { serverError BAD_ARGUMENTS }
-- cleanup
DROP TABLE IF EXISTS nnd;

View File

@ -235,7 +235,7 @@ int compressFiles(char* filenames[], int count, int output_fd, int level, const
continue;
}
printf("Size: %ld\n", info_in.st_size);
printf("Size: %lld\n", info_in.st_size);
/// Save umask
files_data[i].umask = info_in.st_mode;

View File

@ -3,7 +3,11 @@
//#include <cstring>
#include <zstd.h>
#include <sys/mman.h>
#if defined __APPLE__
#include <sys/mount.h>
#else
#include <sys/statfs.h>
#endif
#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>
@ -51,7 +55,7 @@ int decompress(char * input, char * output, off_t start, off_t end, size_t max_n
size = ZSTD_findFrameCompressedSize(input + in_pointer, max_block_size);
if (ZSTD_isError(size))
{
fprintf(stderr, "Error (ZSTD): %zu %s\n", size, ZSTD_getErrorName(size));
fprintf(stderr, "Error (ZSTD): %lld %s\n", size, ZSTD_getErrorName(size));
error_happened = true;
break;
}
@ -59,7 +63,7 @@ int decompress(char * input, char * output, off_t start, off_t end, size_t max_n
decompressed_size = ZSTD_getFrameContentSize(input + in_pointer, max_block_size);
if (ZSTD_isError(decompressed_size))
{
fprintf(stderr, "Error (ZSTD): %zu %s\n", decompressed_size, ZSTD_getErrorName(decompressed_size));
fprintf(stderr, "Error (ZSTD): %lld %s\n", decompressed_size, ZSTD_getErrorName(decompressed_size));
error_happened = true;
break;
}
@ -170,7 +174,7 @@ int decompressFiles(int input_fd, char * path, char * name, bool & have_compress
}
if (fs_info.f_blocks * info_in.st_blksize < decompressed_full_size)
{
fprintf(stderr, "Not enough space for decompression. Have %lu, need %zu.",
fprintf(stderr, "Not enough space for decompression. Have %llu, need %zu.",
fs_info.f_blocks * info_in.st_blksize, decompressed_full_size);
return 1;
}

View File

@ -12,12 +12,12 @@ apt update
apt install sudo python pip git
pip3 install -r requirements.txt
git config --global --add safe.directory /workspace
./build.py --skip-multi-page --skip-blog --skip-docs --livereload 8080
./build.py --livereload 8080
```
```
cd ../docs/tools
sudo apt install python-3 pip
sudo apt install python3 pip
pip3 install -r requirements.txt
virtualenv build