This commit is contained in:
Alexey Milovidov 2021-02-11 01:23:27 +03:00
parent d3dba0e52a
commit f442b30f30
3 changed files with 37 additions and 31 deletions

View File

@ -108,7 +108,6 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
/// If the decompressed block fits entirely where it needs to be copied.
if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{
//std::cerr << "readBig " << file_in.getFileName() << "\n";
decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;

View File

@ -83,9 +83,9 @@ struct ArrayDifferenceImpl
}
res_ptr = ColumnArray::create(std::move(res_nested), array.getOffsetsPtr());
return true;
}
static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
{
ColumnPtr res;
@ -107,7 +107,6 @@ struct ArrayDifferenceImpl
else
throw Exception("Unexpected column for arrayDifference: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN);
}
};
struct NameArrayDifference { static constexpr auto name = "arrayDifference"; };

View File

@ -213,9 +213,6 @@ void TCPHandler::runImpl()
/// Get blocks of temporary tables
readData(connection_settings);
if (state.io.out)
state.io.out->writeSuffix();
/// Reset the input stream, as we received an empty block while receiving external table data.
/// So, the stream has been marked as cancelled and we can't read from it anymore.
state.block_in.reset();
@ -1183,34 +1180,45 @@ bool TCPHandler::receiveData(bool scalar)
if (block)
{
if (scalar)
{
/// Scalar value
query_context->addScalar(temporary_id.table_name, block);
}
else if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
{
/// Data for external tables
auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal);
StoragePtr storage;
/// If such a table does not exist, create it.
if (resolved)
{
storage = DatabaseCatalog::instance().getTable(resolved, *query_context);
}
else
{
NamesAndTypesList columns = block.getNamesAndTypesList();
std::cerr << columns.toString() << "\n";
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
storage = temporary_table.getTable();
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
}
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
/// The data will be written directly to the table.
auto temporary_table_out = storage->write(ASTPtr(), metadata_snapshot, *query_context);
temporary_table_out->write(block);
temporary_table_out->writeSuffix();
}
else if (state.need_receive_data_for_input)
{
/// 'input' table function.
state.block_for_input = block;
}
else
{
/// If there is an insert request, then the data should be written directly to `state.io.out`.
/// Otherwise, we write the blocks in the temporary `external_table_name` table.
if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input && !state.io.out)
{
auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal);
StoragePtr storage;
/// If such a table does not exist, create it.
if (resolved)
storage = DatabaseCatalog::instance().getTable(resolved, *query_context);
else
{
NamesAndTypesList columns = block.getNamesAndTypesList();
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
storage = temporary_table.getTable();
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
}
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
/// The data will be written directly to the table.
state.io.out = storage->write(ASTPtr(), metadata_snapshot, *query_context);
}
if (state.need_receive_data_for_input)
state.block_for_input = block;
else
state.io.out->write(block);
/// INSERT query.
state.io.out->write(block);
}
return true;
}