Merge pull request #64947 from ilejn/time_virtual_col

Add _time virtual column to file alike storages
This commit is contained in:
Kseniia Sumarokova 2024-06-10 12:14:46 +00:00 committed by GitHub
commit 9a55cdf77c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 109 additions and 48 deletions

View File

@ -54,6 +54,7 @@ SELECT * FROM test_table;
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## See also

View File

@ -235,6 +235,7 @@ libhdfs3 support HDFS namenode HA.
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Storage Settings {#storage-settings}

View File

@ -145,6 +145,7 @@ Code: 48. DB::Exception: Received from localhost:9000. DB::Exception: Reading fr
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns).

View File

@ -102,6 +102,7 @@ For partitioning by month, use the `toYYYYMM(date_column)` expression, where `da
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Settings {#settings}

View File

@ -108,6 +108,7 @@ For partitioning by month, use the `toYYYYMM(date_column)` expression, where `da
- `_path` — Path to the `URL`. Type: `LowCardinalty(String)`.
- `_file` — Resource name of the `URL`. Type: `LowCardinalty(String)`.
- `_size` — Size of the resource in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Storage Settings {#storage-settings}

View File

@ -72,6 +72,7 @@ SELECT count(*) FROM azureBlobStorage('DefaultEndpointsProtocol=https;AccountNam
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
**See Also**

View File

@ -196,6 +196,7 @@ SELECT count(*) FROM file('big_dir/**/file002', 'CSV', 'name String, value UInt3
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Settings {#settings}

View File

@ -97,6 +97,7 @@ FROM hdfs('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name Strin
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Storage Settings {#storage-settings}

View File

@ -272,6 +272,7 @@ FROM s3(
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Storage Settings {#storage-settings}

View File

@ -53,6 +53,7 @@ Character `|` inside patterns is used to specify failover addresses. They are it
- `_path` — Path to the `URL`. Type: `LowCardinalty(String)`.
- `_file` — Resource name of the `URL`. Type: `LowCardinalty(String)`.
- `_size` — Size of the resource in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Storage Settings {#storage-settings}

View File

@ -5,6 +5,7 @@
#include <functional>
#include <memory>
#include <Poco/Timestamp.h>
namespace DB
{
@ -25,6 +26,7 @@ public:
{
UInt64 uncompressed_size;
UInt64 compressed_size;
Poco::Timestamp last_modified;
bool is_encrypted;
};

View File

@ -157,6 +157,7 @@ public:
file_info.emplace();
file_info->uncompressed_size = archive_entry_size(current_entry);
file_info->compressed_size = archive_entry_size(current_entry);
file_info->last_modified = archive_entry_mtime(current_entry);
file_info->is_encrypted = false;
}

View File

@ -195,12 +195,14 @@ Chunk StorageObjectStorageSource::generate()
const auto & object_info = reader.getObjectInfo();
const auto & filename = object_info.getFileName();
chassert(object_info.metadata);
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(
chunk,
read_from_format_info.requested_virtual_columns,
getUniqueStoragePathIdentifier(*configuration, reader.getObjectInfo(), false),
object_info.metadata->size_bytes, &filename);
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, read_from_format_info.requested_virtual_columns,
{
.path = getUniqueStoragePathIdentifier(*configuration, reader.getObjectInfo(), false),
.size = object_info.metadata->size_bytes,
.filename = &filename,
.last_modified = object_info.metadata->last_modified
});
return chunk;
}

View File

@ -421,8 +421,14 @@ Chunk StorageS3QueueSource::generate()
file_status->processed_rows += chunk.getNumRows();
processed_rows_from_file += chunk.getNumRows();
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(
chunk, requested_virtual_columns, path, reader.getObjectInfo().metadata->size_bytes);
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns,
{
.path = path,
.size = reader.getObjectInfo().metadata->size_bytes
});
return chunk;
}
}

View File

@ -1341,6 +1341,7 @@ Chunk StorageFileSource::generate()
chassert(file_enumerator);
current_path = fmt::format("{}::{}", archive_reader->getPath(), *filename_override);
current_file_size = file_enumerator->getFileInfo().uncompressed_size;
current_file_last_modified = file_enumerator->getFileInfo().last_modified;
if (need_only_count && tryGetCountFromCache(current_archive_stat))
continue;
@ -1370,6 +1371,7 @@ Chunk StorageFileSource::generate()
struct stat file_stat;
file_stat = getFileStat(current_path, storage->use_table_fd, storage->table_fd, storage->getName());
current_file_size = file_stat.st_size;
current_file_last_modified = Poco::Timestamp::fromEpochTime(file_stat.st_mtime);
if (getContext()->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
@ -1436,8 +1438,15 @@ Chunk StorageFileSource::generate()
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
/// Enrich with virtual columns.
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(
chunk, requested_virtual_columns, current_path, current_file_size, filename_override.has_value() ? &filename_override.value() : nullptr);
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns,
{
.path = current_path,
.size = current_file_size,
.filename = (filename_override.has_value() ? &filename_override.value() : nullptr),
.last_modified = current_file_last_modified
});
return chunk;
}

View File

@ -279,6 +279,7 @@ private:
FilesIteratorPtr files_iterator;
String current_path;
std::optional<size_t> current_file_size;
std::optional<Poco::Timestamp> current_file_last_modified;
struct stat current_archive_stat;
std::optional<String> filename_override;
Block sample_block;

View File

@ -411,7 +411,12 @@ Chunk StorageURLSource::generate()
if (input_format)
chunk_size = input_format->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, curr_uri.getPath(), current_file_size);
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns,
{
.path = curr_uri.getPath(),
.size = current_file_size
});
return chunk;
}

View File

@ -26,6 +26,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -111,7 +112,7 @@ void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context)
NameSet getVirtualNamesForFileLikeStorage()
{
return {"_path", "_file", "_size"};
return {"_path", "_file", "_size", "_time"};
}
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns)
@ -129,6 +130,7 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription
add_virtual("_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
return desc;
}
@ -187,32 +189,40 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const
return block.getByName("_idx").column;
}
void addRequestedPathFileAndSizeVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, std::optional<size_t> size, const String * filename)
void addRequestedFileLikeStorageVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
VirtualsForFileLikeStorage virtual_values)
{
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), path)->convertToFullColumnIfConst());
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), virtual_values.path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
if (filename)
if (virtual_values.filename)
{
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), *filename)->convertToFullColumnIfConst());
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), (*virtual_values.filename))->convertToFullColumnIfConst());
}
else
{
size_t last_slash_pos = path.find_last_of('/');
auto filename_from_path = path.substr(last_slash_pos + 1);
size_t last_slash_pos = virtual_values.path.find_last_of('/');
auto filename_from_path = virtual_values.path.substr(last_slash_pos + 1);
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), filename_from_path)->convertToFullColumnIfConst());
}
}
else if (virtual_column.name == "_size")
{
if (size)
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), *size)->convertToFullColumnIfConst());
if (virtual_values.size)
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), *virtual_values.size)->convertToFullColumnIfConst());
else
chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_time")
{
if (virtual_values.last_modified)
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), virtual_values.last_modified->epochTime())->convertToFullColumnIfConst());
else
chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst());
}

View File

@ -68,8 +68,18 @@ void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & pa
sources = std::move(filtered_sources);
}
void addRequestedPathFileAndSizeVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, std::optional<size_t> size, const String * filename = nullptr);
struct VirtualsForFileLikeStorage
{
const String & path;
std::optional<size_t> size { std::nullopt };
const String * filename { nullptr };
std::optional<Poco::Timestamp> last_modified { std::nullopt };
};
void addRequestedFileLikeStorageVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
VirtualsForFileLikeStorage virtual_values);
}
}

View File

@ -758,12 +758,12 @@ def test_read_subcolumns(cluster):
)
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.tsv',"
f"select a.b.d, _path, a.b, _file, dateDiff('minute', _time, now()), a.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.tsv',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto',"
f" 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\tcont/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
assert res == "2\tcont/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t0\t3\n"
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl',"

View File

@ -987,10 +987,10 @@ def test_read_subcolumns(started_cluster):
assert res == "2\ttest_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
f"select x.b.d, _path, x.b, _file, dateDiff('minute', _time, now()), x.e from hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "0\ttest_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
assert res == "0\ttest_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\t0\n"
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"

View File

@ -2117,10 +2117,12 @@ def test_read_subcolumns(started_cluster):
assert res == "0\troot/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
res = instance.query(
f"select x.b.d, _path, x.b, _file, x.e from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
f"select x.b.d, _path, x.b, _file, dateDiff('minute', _time, now()), x.e from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
)
assert res == "42\troot/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
assert (
res == "42\troot/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t0\t42\n"
)
res = instance.query(
f"select a.b.d, _path, a.b, _file, a.e from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
@ -2148,6 +2150,8 @@ def test_read_subcolumns(started_cluster):
res == "42\t/root/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
)
logging.info("Some custom logging")
def test_filtering_by_file_or_path(started_cluster):
bucket = started_cluster.minio_bucket

View File

@ -1,13 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "1,2" > $CLICKHOUSE_TEST_UNIQUE_NAME.csv
$CLICKHOUSE_LOCAL -nm -q "
create table test (x UInt64, y UInt32, size UInt64) engine=Memory;
insert into test select c1, c2, _size from file('$CLICKHOUSE_TEST_UNIQUE_NAME.csv') settings use_structure_from_insertion_table_in_table_functions=1;
select * from test;
"
rm $CLICKHOUSE_TEST_UNIQUE_NAME.csv

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "1,2" > $CLICKHOUSE_TEST_UNIQUE_NAME.csv
sleep 1
$CLICKHOUSE_LOCAL -nm -q "
create table test (x UInt64, y UInt32, size UInt64, d32 DateTime32, d64 DateTime64) engine=Memory;
insert into test select c1, c2, _size, _time, _time from file('$CLICKHOUSE_TEST_UNIQUE_NAME.csv') settings use_structure_from_insertion_table_in_table_functions=1;
select x, y, size, (dateDiff('millisecond', d32, now()) < 4000 AND dateDiff('millisecond', d32, now()) > 0), (dateDiff('second', d64, now()) < 4 AND dateDiff('second', d64, now()) > 0) from test;
"
rm $CLICKHOUSE_TEST_UNIQUE_NAME.csv