mirror of
https://github.com/TREX-CoE/trexio.git
synced 2025-01-10 13:08:27 +01:00
447 lines
16 KiB
Python
447 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import numpy as np
|
|
import pytest
|
|
import trexio
|
|
from benzene_data import *
|
|
|
|
|
|
# this function is copied from the trexio-tools github repository (BSD-3 license):
|
|
# https://github.com/TREX-CoE/trexio_tools/blob/master/src/trexio_tools/group_tools/determinant.py
|
|
def to_determinant_list(orbital_list: list, int64_num: int) -> list:
|
|
"""
|
|
Convert a list of occupied orbitals from the `orbital_list`
|
|
into a list of Slater determinants (in their bit string representation).
|
|
|
|
Orbitals in the `orbital_list` should be 0-based, namely the lowest orbital has index 0, not 1.
|
|
|
|
int64_num is the number of 64-bit integers needed to represent a Slater determinant bit string.
|
|
It depends on the number of molecular orbitals as follows: int64_num = int((mo_num-1)/64) + 1
|
|
"""
|
|
|
|
if not isinstance(orbital_list, list):
|
|
raise TypeError(f"orbital_list should be a list, not {type(orbital_list)}")
|
|
|
|
det_list = []
|
|
bitfield = 0
|
|
shift = 0
|
|
|
|
# since orbital indices are 0-based but the code below works for 1-based --> increment the input indices by +1
|
|
orb_list_upshifted = [ orb+1 for orb in orbital_list]
|
|
|
|
# orbital list has to be sorted in increasing order for the bitfields to be set correctly
|
|
orb_list_sorted = sorted(orb_list_upshifted)
|
|
|
|
for orb in orb_list_sorted:
|
|
|
|
if orb-shift > 64:
|
|
# this removes the 0 bit from the beginning of the bitfield
|
|
bitfield = bitfield >> 1
|
|
# append a bitfield to the list
|
|
det_list.append(bitfield)
|
|
bitfield = 0
|
|
|
|
modulo = int((orb-1)/64)
|
|
shift = modulo*64
|
|
bitfield |= (1 << (orb-shift))
|
|
|
|
# this removes the 0 bit from the beginning of the bitfield
|
|
bitfield = bitfield >> 1
|
|
det_list.append(bitfield)
|
|
#print('Popcounts: ', [bin(d).count('1') for d in det_list)
|
|
#print('Bitfields: ', [bin(d) for d in det_list])
|
|
|
|
bitfield_num = len(det_list)
|
|
if bitfield_num > int64_num:
|
|
raise Exception(f'Number of bitfields {bitfield_num} cannot be more than the int64_num {int64_num}.')
|
|
if bitfield_num < int64_num:
|
|
for _ in range(int64_num - bitfield_num):
|
|
print("Appending an empty bitfield.")
|
|
det_list.append(0)
|
|
|
|
return det_list
|
|
|
|
|
|
def clean(back_end, filename):
|
|
"""Remove test files."""
|
|
if back_end == trexio.TREXIO_HDF5:
|
|
import os
|
|
try:
|
|
os.remove(filename)
|
|
os.remove('unsafe_' + filename)
|
|
except FileNotFoundError:
|
|
pass
|
|
else:
|
|
import shutil
|
|
try:
|
|
shutil.rmtree(filename)
|
|
shutil.rmtree('unsafe_' + filename)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def test_info():
|
|
"""Print the output of the trexio.info function."""
|
|
trexio.info()
|
|
|
|
|
|
def test_void():
|
|
"""Check raise of an error upon I/O on non-existing file."""
|
|
with pytest.raises(trexio.Error):
|
|
_ = trexio.File('void.file', 'r', trexio.TREXIO_TEXT)
|
|
|
|
|
|
def test_orbital_list():
|
|
"""Convert one determinant into a list of orbitals."""
|
|
orb_list_up, orb_list_dn = trexio.to_orbital_list_up_dn(int64_num, det_test)
|
|
assert orb_list_up[0] == 0
|
|
assert orb_list_dn[0] == 1
|
|
|
|
|
|
def test_bitfield_list():
|
|
"""Convert lists of occupied up- and down-spin orbitals into determinants."""
|
|
# convert det_test list into a numpy array for .all() assertion to work
|
|
det_test_np = np.array(det_test, dtype=np.int64)
|
|
|
|
det_list_up = trexio.to_bitfield_list(int64_num, orb_up_test)
|
|
assert (det_list_up == det_test_np[:int64_num]).all()
|
|
det_list_dn = trexio.to_bitfield_list(int64_num, orb_dn_test)
|
|
assert (det_list_dn == det_test_np[int64_num:]).all()
|
|
|
|
|
|
class TestIO:
|
|
"""Unit tests for writing/reading different blocks of the TREXIO file."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self, backend):
|
|
self.mode = 'w'
|
|
self.test_file = None
|
|
if backend == 'hdf5':
|
|
self.back_end = trexio.TREXIO_HDF5
|
|
self.filename = 'test_file_py.h5'
|
|
elif backend == 'text':
|
|
self.back_end = trexio.TREXIO_TEXT
|
|
self.filename = 'test_file_py.dir'
|
|
else:
|
|
raise ValueError("Wrong TREXIO back-end supplied to pytest.")
|
|
|
|
|
|
def test_clean(self):
|
|
"""Clean existing files."""
|
|
clean(self.back_end, self.filename)
|
|
|
|
|
|
#def __del__(self):
|
|
# """Class destructor."""
|
|
# if self.test_file:
|
|
# if self.test_file.isOpen:
|
|
# self.test_file.close()
|
|
def open(self, filename=None, mode=None, back_end=None):
|
|
"""Create a TREXIO file and open it for writing."""
|
|
if not filename:
|
|
filename = self.filename
|
|
else:
|
|
self.filename = filename
|
|
if not mode:
|
|
mode = self.mode
|
|
else:
|
|
self.mode = mode
|
|
if not back_end:
|
|
back_end = self.back_end
|
|
else:
|
|
self.back_end = back_end
|
|
|
|
self.test_file = trexio.File(filename, mode, back_end)
|
|
assert self.test_file.exists
|
|
|
|
|
|
def test_close(self):
|
|
"""Close the file."""
|
|
self.open()
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
assert not self.test_file.isOpen
|
|
|
|
|
|
def test_errors(self):
|
|
"""Test some exceptions based on trexio.Error class."""
|
|
self.open(filename='unsafe_' + self.filename, mode='w', back_end=self.back_end)
|
|
# try to write a negative number (should raise an error)
|
|
with pytest.raises(trexio.Error):
|
|
trexio.write_nucleus_num(self.test_file, -100)
|
|
|
|
trexio.write_nucleus_num(self.test_file, nucleus_num)
|
|
|
|
# try to overwrite a number (should raise an error)
|
|
with pytest.raises(trexio.Error):
|
|
trexio.write_nucleus_num(self.test_file, nucleus_num * 2)
|
|
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_num(self):
|
|
"""Write a number."""
|
|
self.open()
|
|
trexio.write_nucleus_num(self.test_file, nucleus_num)
|
|
assert trexio.has_nucleus_num(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_str(self):
|
|
"""Write a string."""
|
|
self.open()
|
|
trexio.write_nucleus_point_group(self.test_file, point_group)
|
|
assert trexio.has_nucleus_point_group(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_array_str(self):
|
|
"""Write an array of strings."""
|
|
self.open()
|
|
if not trexio.has_nucleus_num(self.test_file):
|
|
self.test_num()
|
|
trexio.write_nucleus_label(self.test_file, nucleus_label)
|
|
assert trexio.has_nucleus_label(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_array_1D(self):
|
|
"""Write array of charges."""
|
|
self.open()
|
|
if not trexio.has_nucleus_num(self.test_file):
|
|
self.test_num()
|
|
trexio.write_nucleus_charge(self.test_file, nucleus_charge)
|
|
assert trexio.has_nucleus_charge(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_array_2D(self):
|
|
"""Write array of coordinates."""
|
|
self.open()
|
|
if not trexio.has_nucleus_num(self.test_file):
|
|
self.test_num()
|
|
trexio.write_nucleus_coord(self.test_file, nucleus_coord)
|
|
assert trexio.has_nucleus_coord(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_indices(self):
|
|
"""Write array of indices."""
|
|
self.open()
|
|
# type cast is important here because by default numpy transforms a list of integers into int64 array
|
|
indices_np = np.array(nucleus_index, dtype=np.int64)
|
|
# first write basis_shell_num because it is needed to check dimensions of basis_nucleus_index
|
|
trexio.write_basis_shell_num(self.test_file, basis_shell_num)
|
|
# now write the indices
|
|
trexio.write_basis_nucleus_index(self.test_file, indices_np)
|
|
assert trexio.has_basis_nucleus_index(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_sparse(self):
|
|
"""Write a sparse array."""
|
|
self.open()
|
|
# write ao_num (needed later to write sparse ao_2e_int_eri integrals)
|
|
trexio.write_ao_num(self.test_file, ao_num)
|
|
# one complete write (no chunking)
|
|
offset = 0
|
|
trexio.write_ao_2e_int_eri(self.test_file, offset, num_integrals, indices, values)
|
|
assert trexio.has_ao_2e_int_eri(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_determinant(self):
|
|
"""Write CI determinants and coefficients."""
|
|
self.open()
|
|
# write mo_num (needed later to write determinants)
|
|
MO_NUM_TEST = 100
|
|
trexio.write_mo_num(self.test_file, MO_NUM_TEST)
|
|
# get the number of bit-strings per spin component
|
|
INT64_NUM_TEST = int((MO_NUM_TEST-1)/64) + 1
|
|
int_num = trexio.get_int64_num(self.test_file)
|
|
assert int_num == INT64_NUM_TEST
|
|
# write the number of up and down electrons
|
|
trexio.write_electron_up_num(self.test_file, 4)
|
|
trexio.write_electron_dn_num(self.test_file, 3)
|
|
# orbital lists
|
|
orb_list_up = [0,1,2,3]
|
|
orb_list_dn = [0,1,2]
|
|
|
|
# data to write
|
|
DET_NUM_TEST = 100
|
|
det_up = to_determinant_list(orb_list_up, INT64_NUM_TEST)
|
|
det_dn = to_determinant_list(orb_list_dn, INT64_NUM_TEST)
|
|
|
|
det_list = []
|
|
coeff_list = []
|
|
for i in range(DET_NUM_TEST):
|
|
det_list.append(det_up + det_dn)
|
|
coeff_list.append(3.14 + float(i))
|
|
|
|
# write the data for the ground state
|
|
offset = 0
|
|
trexio.write_state_id(self.test_file, 0)
|
|
trexio.write_determinant_list(self.test_file, offset, DET_NUM_TEST, det_list)
|
|
assert trexio.has_determinant_list(self.test_file)
|
|
trexio.write_determinant_coefficient(self.test_file, offset, DET_NUM_TEST, coeff_list)
|
|
assert trexio.has_determinant_coefficient(self.test_file)
|
|
# manually check the consistency between coefficient_size and number of determinants
|
|
assert trexio.read_determinant_coefficient_size(self.test_file) == trexio.read_determinant_num(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_delete_group(self):
|
|
"""Delete a group."""
|
|
self.open(filename='unsafe_' + self.filename, mode='u', back_end=self.back_end)
|
|
|
|
trexio.write_nucleus_num(self.test_file, nucleus_num)
|
|
trexio.write_nucleus_charge(self.test_file, nucleus_charge)
|
|
trexio.flush(self.test_file)
|
|
|
|
assert trexio.has_nucleus_num(self.test_file)
|
|
assert trexio.has_nucleus_charge(self.test_file)
|
|
assert trexio.has_nucleus(self.test_file)
|
|
|
|
trexio.delete_nucleus(self.test_file)
|
|
|
|
assert not trexio.has_nucleus_num(self.test_file)
|
|
assert not trexio.has_nucleus_charge(self.test_file)
|
|
assert not trexio.has_nucleus(self.test_file)
|
|
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_has_group(self):
|
|
"""Check existense of a group."""
|
|
self.open()
|
|
assert trexio.has_nucleus(self.test_file)
|
|
assert not trexio.has_rdm(self.test_file)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_context_manager(self):
|
|
"""Test the with ... as ... context handling."""
|
|
with trexio.File(filename=self.filename, mode='u', back_end=self.back_end) as tfile:
|
|
trexio.write_metadata_description(tfile, 'Test file produced by the Python API')
|
|
assert trexio.has_metadata_description(tfile)
|
|
assert tfile.isOpen
|
|
# the file handle can remain existing but the file itself is closed upon exit from the `with` block
|
|
assert not tfile.isOpen
|
|
|
|
|
|
def test_read_num(self):
|
|
"""Read a number."""
|
|
self.open(mode='r')
|
|
num_r = trexio.read_nucleus_num(self.test_file)
|
|
assert num_r == nucleus_num
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_read_array_1D(self):
|
|
"""Read an array."""
|
|
self.open(mode='r')
|
|
charges_np_r = trexio.read_nucleus_charge(self.test_file)
|
|
assert charges_np_r.dtype is np.dtype(np.float64)
|
|
assert charges_np_r.size == nucleus_num
|
|
np.testing.assert_array_almost_equal(charges_np_r, np.array(nucleus_charge), decimal=8)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_read_array_2D(self):
|
|
"""Read an array."""
|
|
self.open(mode='r')
|
|
# read nuclear coordinates without providing optional argument dim
|
|
coords_np = trexio.read_nucleus_coord(self.test_file)
|
|
assert coords_np.dtype is np.dtype(np.float64)
|
|
assert coords_np.size == nucleus_num * 3
|
|
np.testing.assert_array_almost_equal(coords_np, np.array(nucleus_coord).reshape(nucleus_num,3), decimal=8)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_read_errors(self):
|
|
"""Test some reading errors."""
|
|
self.open(mode='r')
|
|
# unsafe call to read_safe should fail with error message corresponding to TREXIO_UNSAFE_ARRAY_DIM
|
|
with pytest.raises(trexio.Error):
|
|
_ = trexio.read_nucleus_charge(self.test_file, dim=nucleus_num/2)
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_read_integers(self):
|
|
"""Read some integer arrays."""
|
|
self.open(mode='r')
|
|
|
|
indices_np_16 = trexio.read_basis_nucleus_index(self.test_file, dtype=np.int16)
|
|
assert indices_np_16.dtype is np.dtype(np.int16)
|
|
assert (indices_np_16 == np.array(nucleus_index)).all()
|
|
|
|
indices_np_32 = trexio.read_basis_nucleus_index(self.test_file, dtype=np.int32)
|
|
assert indices_np_32.dtype is np.dtype(np.int32)
|
|
assert (indices_np_32 == np.array(nucleus_index)).all()
|
|
|
|
indices_np_64 = trexio.read_basis_nucleus_index(self.test_file)
|
|
assert indices_np_64.dtype is np.dtype(np.int64)
|
|
assert indices_np_64.size == basis_shell_num
|
|
assert (indices_np_64 == np.array(nucleus_index)).all()
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
def test_sparse_read(self):
|
|
"""Read a sparse array."""
|
|
self.open(mode='r')
|
|
# read sparse arrays on ao_2e_int_eri integrals
|
|
buf_size = 60
|
|
offset_file = 0
|
|
# read full buf_size (i.e. the one that does not reach EOF)
|
|
indices_sparse_np, value_sparse_np, read_buf_size, eof = trexio.read_ao_2e_int_eri(self.test_file, offset_file, buf_size)
|
|
#print(f'First complete sparse read size: {read_buf_size}')
|
|
assert not eof
|
|
assert read_buf_size == buf_size
|
|
assert indices_sparse_np[0][0] == 0
|
|
assert indices_sparse_np[read_buf_size-1][3] == read_buf_size * 4 - 1
|
|
|
|
offset_file += buf_size
|
|
# read incomplete buf_size (i.e. the one that does reach EOF)
|
|
indices_sparse_np, value_sparse_np, read_buf_size, eof = trexio.read_ao_2e_int_eri(self.test_file, offset_file, buf_size)
|
|
#print(f'Second incomplete sparse read size: {read_buf_size}')
|
|
assert eof
|
|
assert read_buf_size == (num_integrals - buf_size)
|
|
assert indices_sparse_np[0][0] == offset_file * 4
|
|
assert indices_sparse_np[read_buf_size-1][3] == (offset_file + read_buf_size) * 4 - 1
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_array_str_read(self):
|
|
"""Read an array of strings."""
|
|
self.open(mode='r')
|
|
labels_r = trexio.read_nucleus_label(self.test_file)
|
|
assert len(labels_r) == nucleus_num
|
|
assert labels_r == nucleus_label
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|
|
|
|
|
|
def test_str_read(self):
|
|
"""Read a string."""
|
|
self.open(mode='r')
|
|
point_group_r = trexio.read_nucleus_point_group(self.test_file)
|
|
assert point_group_r == point_group
|
|
if self.test_file.isOpen:
|
|
self.test_file.close()
|