Fix tests

This commit is contained in:
avogar 2022-01-18 22:26:13 +03:00
parent b318f9b5db
commit 4efadfad3c
7 changed files with 59 additions and 33 deletions

View File

@ -132,6 +132,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
Pos before_values = pos;
String format_str;
bool values = false;
/// VALUES or FORMAT or SELECT or WITH or WATCH.
/// After FROM INFILE we expect FORMAT, SELECT, WITH or nothing.
@ -140,6 +141,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
/// If VALUES is defined in query, everything except setting will be parsed as data
data = pos->begin;
format_str = "Values";
values = true;
}
else if (s_format.ignore(pos, expected))
{
@ -184,6 +186,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserSetQuery parser_settings(true);
if (!parser_settings.parse(pos, settings_ast, expected))
return false;
if (values)
data = pos->begin;
}
if (select)

View File

@ -214,20 +214,34 @@ private:
class HDFSSource::URISIterator::Impl
{
public:
Impl(const std::vector<const String> & uris_) : uris(uris_), index(0)
explicit Impl(const std::vector<const String> & uris_, ContextPtr context)
{
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]);
HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
for (const auto & uri : uris_)
{
path_and_uri = getPathFromUriAndUriWithoutPath(uri);
if (!hdfsExists(fs.get(), path_and_uri.first.c_str()))
uris.push_back(uri);
}
uris_iter = uris.begin();
}
String next()
{
if (index == uris.size())
std::lock_guard lock(mutex);
if (uris_iter == uris.end())
return "";
return uris[index++];
auto key = *uris_iter;
++uris_iter;
return key;
}
private:
const std::vector<const String> & uris;
size_t index;
std::mutex mutex;
Strings uris;
Strings::iterator uris_iter;
};
Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
@ -263,8 +277,8 @@ String HDFSSource::DisclosedGlobIterator::next()
return pimpl->next();
}
HDFSSource::URISIterator::URISIterator(const std::vector<const String> & uris_)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_))
HDFSSource::URISIterator::URISIterator(const std::vector<const String> & uris_, ContextPtr context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, context))
{
}
@ -501,7 +515,7 @@ Pipe StorageHDFS::read(
}
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris);
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();

View File

@ -99,7 +99,7 @@ public:
class URISIterator
{
public:
URISIterator(const std::vector<const String> & uris_);
URISIterator(const std::vector<const String> & uris_, ContextPtr context);
String next();
private:
class Impl;

View File

@ -82,8 +82,6 @@ public:
Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_)
: client(client_), globbed_uri(globbed_uri_)
{
std::lock_guard lock(mutex);
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
@ -179,20 +177,24 @@ String StorageS3Source::DisclosedGlobIterator::next()
class StorageS3Source::KeysIterator::Impl
{
public:
explicit Impl(const std::vector<String> & keys_) : keys(keys_), index(0)
explicit Impl(const std::vector<String> & keys_) : keys(keys_), keys_iter(keys.begin())
{
}
String next()
{
if (index == keys.size())
std::lock_guard lock(mutex);
if (keys_iter == keys.end())
return "";
return keys[index++];
auto key = *keys_iter;
++keys_iter;
return key;
}
private:
const std::vector<String> & keys;
size_t index;
std::mutex mutex;
Strings keys;
Strings::iterator keys_iter;
};
StorageS3Source::KeysIterator::KeysIterator(const std::vector<String> & keys_) : pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_))
@ -849,9 +851,10 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx)
{
std::vector<String> keys = {client_auth.uri.key};
auto read_buffer_creator = [&]()
{
auto file_iterator = createFileIterator(client_auth, {client_auth.uri.key}, is_key_with_globs, distributed_processing, ctx);
auto file_iterator = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, ctx);
String current_key = (*file_iterator)();
if (current_key.empty())
throw Exception(

View File

@ -366,12 +366,12 @@ def test_overwrite(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test as {table_function}")
node1.query(f"insert into test select number, randomString(100) from numbers(5)")
node1.query_and_get_error(f"insert into test select number, randomString(100) FROM numbers(10)")
node1.query(f"insert into test select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1")
node1.query(f"create table test_overwrite as {table_function}")
node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(5)")
node1.query_and_get_error(f"insert into test_overwrite select number, randomString(100) FROM numbers(10)")
node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1")
result = node1.query(f"select count() from test")
result = node1.query(f"select count() from test_overwrite")
assert(int(result) == 10)

View File

@ -10,6 +10,11 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet>
<s3_parquet_gz>
<url>http://minio1:9001/root/test_parquet_gz</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet_gz>
<s3_orc>
<url>http://minio1:9001/root/test_orc</url>
<access_key_id>minio</access_key_id>

View File

@ -136,7 +136,7 @@ def test_put(started_cluster, maybe_auth, positive, compression):
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test.csv"
put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}',
{maybe_auth}'CSV', '{table_format}', {compression}) values {values}"""
{maybe_auth}'CSV', '{table_format}', {compression}) values settings s3_truncate_on_insert=1 {values}"""
try:
run_query(instance, put_query)
@ -298,7 +298,7 @@ def test_put_csv(started_cluster, maybe_auth, positive):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV settings s3_truncate_on_insert=1".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format)
csv_data = "8,9,16\n11,18,13\n22,14,2\n"
@ -322,7 +322,7 @@ def test_put_get_with_redirect(started_cluster):
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
values_csv = "1,1,1\n1,1,1\n11,11,11\n"
filename = "test.csv"
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
run_query(instance, query)
@ -350,12 +350,12 @@ def test_put_with_zero_redirect(started_cluster):
filename = "test.csv"
# Should work without redirect
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, table_format, values)
run_query(instance, query)
# Should not work with redirect
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
exception_raised = False
try:
@ -805,13 +805,13 @@ def test_seekable_formats(started_cluster):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1")
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1")
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
@ -827,14 +827,14 @@ def test_seekable_formats_url(started_cluster):
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}")
@ -939,7 +939,7 @@ def test_create_new_files_on_insert(started_cluster):
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_multiple_inserts as {table_function}")
instance.query(f"truncate table test_multiple_inserts")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
@ -951,7 +951,7 @@ def test_create_new_files_on_insert(started_cluster):
table_function = f"s3(s3_parquet_gz, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_multiple_inserts as {table_function}")
instance.query(f"truncate table test_multiple_inserts")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")