Source code for nabu.pipeline.helical.filtering

# pylint: disable=too-many-arguments
import numpy as np
from ...utils import get_cuda_srcfile, updiv
from ...reconstruction.filtering import get_next_power
from ...reconstruction.filtering_cuda import CudaSinoFilter

# pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-function-args


[docs] class HelicalSinoFilter(CudaSinoFilter): def __init__( self, sino_shape, filter_name=None, padding_mode="zeros", extra_options=None, cuda_options=None, ): """Derived from nabu.reconstruction.filtering.SinoFilter It is used by helical_chunked_regridded pipeline. The shape of the processed sino, as a matter of fact which is due to the helical_chunked_regridded.py module which is using the here present class, is always, but not necessarily [nangles, nslices, nhorizontal] with nslices = 1. This because helical_chunked_regridded.py after a first preprocessing phase, always processes slices one by one. In helical_chunked_regridded .py, the call to the filter_sino method here contained is followed by the weight redistribution ( done by another module), which solves the HA problem, and the backprojection. The latter is performed by fbp.py or hbp.py The reason for having this class, derived from nabu.reconstruction.filtering.SinoFilter, is that the padding mechanism here implemented incorporates the padding with the available theta+180 projection on the half-tomo side. """ super().__init__( sino_shape, filter_name=filter_name, padding_mode=padding_mode, extra_options=extra_options, cuda_options=cuda_options, ) self._init_pad_kernel() def _check_array(self, arr): """ This class may work with an arbitrary number of projections. This is a consequence of the first implementation of the helical pipeline. In the first implementation the slices were reconstructed by backprojecting several turns, and the number of useful projections was different from the beginning or end, to the center of the scan. Now in helical_chunked_regridded.py the number of projections is fixed. The only relic left is that the present class may work with an arbitrary number of projections. """ if arr.dtype != np.float32: raise ValueError("Expected data type = numpy.float32") if arr.shape[1:] != self.sino_shape[1:]: raise ValueError("Expected sinogram shape %s, got %s" % (self.sino_shape, arr.shape)) def _init_pad_kernel(self): """The four possible padding kernels. The first two, compared to nabu.reconstruction.filtering.SinoFilter can work with an arbitrary number of projection. The latter two implement the padding with the available information from theta+180. """ self.kern_args = (self.d_sino_f, self.d_filter_f) self.kern_args += self.d_sino_f.shape[::-1] self._pad_mirror_edges_kernel = self.cuda.kernel( "padding", filename=get_cuda_srcfile("helical_padding.cu"), signature="PPfiiiii", options=[str("-DMIRROR_EDGES")], ) self._pad_mirror_constant_kernel = self.cuda.kernel( "padding", filename=get_cuda_srcfile("helical_padding.cu"), signature="PPfiiiiiff", options=[str("-DMIRROR_CONSTANT")], ) self._pad_mirror_edges_variable_rot_pos_kernel = self.cuda.kernel( "padding", filename=get_cuda_srcfile("helical_padding.cu"), signature="PPPiiiii", options=[str("-DMIRROR_EDGES_VARIABLE_ROT_POS")], ) self._pad_mirror_constant_variable_rot_pos_kernel = self.cuda.kernel( "padding", filename=get_cuda_srcfile("helical_padding.cu"), signature="PPPiiiiiff", options=[str("-DMIRROR_CONSTANT_VARIABLE_ROT_POS")], ) self.d_mirror_indexes = self.cuda.allocate_array( "d_mirror_indexes", (self.sino_padded_shape[-2],), dtype=np.int32 ) self.d_variable_rot_pos = self.cuda.allocate_array( "d_variable_rot_pos", (self.sino_padded_shape[-2],), dtype=np.float32 ) self._pad_edges_kernel = self.cuda.kernel( "padding_edge", filename=get_cuda_srcfile("padding.cu"), signature="Piiiiiiii" ) self._pad_block = (32, 32, 1) self._pad_grid = tuple([updiv(n, p) for n, p in zip(self.sino_padded_shape[::-1], self._pad_block)]) def _pad_sino(self, sino, mirror_indexes=None, rot_center=None): """redefined here to adapt the memory copy to the lenght of the sino argument which, in the general helical case may be varying """ if mirror_indexes is None: self._pad_sino_simple(sino) self.d_mirror_indexes[:] = np.zeros([len(self.d_mirror_indexes)], np.int32) self.d_mirror_indexes[: len(mirror_indexes)] = mirror_indexes.astype(np.int32) if np.isscalar(rot_center): argument_rot_center = rot_center tmp_pad_mirror_edges_kernel = self._pad_mirror_edges_kernel tmp_pad_mirror_constant_kernel = self._pad_mirror_constant_kernel else: self.d_variable_rot_pos[: len(rot_center)] = rot_center argument_rot_center = self.d_variable_rot_pos tmp_pad_mirror_edges_kernel = self._pad_mirror_edges_variable_rot_pos_kernel tmp_pad_mirror_constant_kernel = self._pad_mirror_constant_variable_rot_pos_kernel self.d_sino_padded[: len(sino), : self.dwidth] = sino[:] if self.padding_mode == "edges": tmp_pad_mirror_edges_kernel( self.d_sino_padded, self.d_mirror_indexes, argument_rot_center, self.dwidth, self.n_angles, self.dwidth_padded, self.pad_left, self.pad_right, grid=self._pad_grid, block=self._pad_block, ) else: tmp_pad_mirror_constant_kernel( self.d_sino_padded, self.d_mirror_indexes, argument_rot_center, self.dwidth, self.n_angles, self.dwidth_padded, self.pad_left, self.pad_right, 0.0, 0.0, grid=self._pad_grid, block=self._pad_block, ) def _pad_sino_simple(self, sino): if self.padding_mode == "edges": self.d_sino_padded[: len(sino), : self.dwidth] = sino[:] self._pad_edges_kernel( self.d_sino_padded, self.dwidth, self.n_angles, self.dwidth_padded, self.n_angles, self.pad_left, self.pad_right, 0, 0, grid=self._pad_grid, block=self._pad_block, ) else: # zeros self.d_sino_padded.fill(0) if self.ndim == 2: self.d_sino_padded[: len(sino), : self.dwidth] = sino[:] else: self.d_sino_padded[: len(sino), :, : self.dwidth] = sino[:]
[docs] def filter_sino(self, sino, mirror_indexes=None, rot_center=None, output=None, no_output=False): """ Perform the sinogram siltering. redefined here to use also mirror data Parameters ---------- sino: numpy.ndarray or pycuda.gpuarray.GPUArray Input sinogram (2D or 3D) output: numpy.ndarray or pycuda.gpuarray.GPUArray, optional Output array. no_output: bool, optional If set to True, no copy is be done. The resulting data lies in self.d_sino_padded. """ self._check_array(sino) # copy2d/copy3d self._pad_sino(sino, mirror_indexes=mirror_indexes, rot_center=rot_center) # FFT self.fft.fft(self.d_sino_padded, output=self.d_sino_f) # multiply padded sinogram with filter in the Fourier domain self.mult_kernel(*self.kern_args) # TODO tune block size ? # iFFT self.fft.ifft(self.d_sino_f, output=self.d_sino_padded) # return if no_output: return self.d_sino_padded if output is None: res = np.zeros(self.sino_shape, dtype=np.float32) # can't do memcpy2d D->H ? (self.d_sino_padded[:, w]) I have to get() sino_ref = self.d_sino_padded.get() else: res = output sino_ref = self.d_sino_padded if self.ndim == 2: res[:] = sino_ref[:, : self.dwidth] else: res[:] = sino_ref[:, :, : self.dwidth] return res
def _calculate_shapes(self, sino_shape): self.ndim = len(sino_shape) if self.ndim == 2: n_angles, dwidth = sino_shape n_sinos = 1 elif self.ndim == 3: n_sinos, n_angles, dwidth = sino_shape else: raise ValueError("Invalid sinogram number of dimensions") self.sino_shape = sino_shape self.n_angles = n_angles self.dwidth = dwidth # int() is crucial here ! Otherwise some pycuda arguments (ex. memcpy2D) # will not work with numpy.int64 (as for 2018.X) ### the original get_next_power used in nabu gives a lower ram footprint self.dwidth_padded = 2 * int(get_next_power(self.dwidth)) self.sino_padded_shape = (n_angles, self.dwidth_padded) if self.ndim == 3: self.sino_padded_shape = (n_sinos,) + self.sino_padded_shape sino_f_shape = list(self.sino_padded_shape) sino_f_shape[-1] = sino_f_shape[-1] // 2 + 1 self.sino_f_shape = tuple(sino_f_shape) # self.pad_left = (self.dwidth_padded - self.dwidth) // 2 self.pad_right = self.dwidth_padded - self.dwidth - self.pad_left