from .double_flatfield import DoubleFlatField
from ..utils import check_shape
from ..cuda.utils import __has_pycuda__
from ..cuda.processing import CudaProcessing
from ..processing.unsharp_cuda import CudaUnsharpMask
from .ccd_cuda import CudaLog
if __has_pycuda__:
import pycuda.gpuarray as garray
import pycuda.cumath as cumath
[docs]
class CudaDoubleFlatField(DoubleFlatField):
def __init__(
self,
shape,
result_url=None,
sub_region=None,
detector_corrector=None,
input_is_mlog=True,
output_is_mlog=False,
average_is_on_log=False,
sigma_filter=None,
filter_mode="reflect",
log_clip_min=None,
log_clip_max=None,
cuda_options=None,
):
"""
Init double flat field with Cuda backend.
"""
self.cuda_processing = CudaProcessing(**(cuda_options or {}))
super().__init__(
shape,
result_url=result_url,
sub_region=sub_region,
detector_corrector=detector_corrector,
input_is_mlog=input_is_mlog,
output_is_mlog=output_is_mlog,
average_is_on_log=average_is_on_log,
sigma_filter=sigma_filter,
filter_mode=filter_mode,
log_clip_min=log_clip_min,
log_clip_max=log_clip_max,
)
self._init_gaussian_filter()
def _init_gaussian_filter(self):
if self.sigma_filter is None:
return
self._unsharp_mask = CudaUnsharpMask(self.shape, self.sigma_filter, -1.0, mode=self.filter_mode, method="log")
@staticmethod
def _proc_copy(x, o):
o[:] = x[:]
return o
@staticmethod
def _proc_expm(x, o):
o[:] = x[:]
o[:] *= -1
cumath.exp(o, out=o) # pylint: disable=E0606
return o
def _init_processing(self, input_is_mlog, output_is_mlog, average_is_on_log, sigma_filter, filter_mode):
self.input_is_mlog = input_is_mlog
self.output_is_mlog = output_is_mlog
self.average_is_on_log = average_is_on_log
self.sigma_filter = sigma_filter
if self.sigma_filter is not None and abs(float(self.sigma_filter)) < 1e-4:
self.sigma_filter = None
self.filter_mode = filter_mode
# proc = lambda x,o: np.copyto(o, x)
proc = self._proc_copy
self._mlog = CudaLog((1,) + self.shape, clip_min=self._log_clip_min, clip_max=self._log_clip_max)
if self.input_is_mlog:
if not self.average_is_on_log:
# proc = lambda x,o: np.exp(-x, out=o)
proc = self._proc_expm
else:
if self.average_is_on_log:
# proc = lambda x,o: -np.log(x, out=o)
proc = self._proc_mlog
# postproc = lambda x: x
postproc = self._proc_copy
if self.output_is_mlog:
if not self.average_is_on_log:
# postproc = lambda x: -np.log(x)
postproc = self._proc_mlog
else:
if self.average_is_on_log:
# postproc = lambda x: np.exp(-x)
postproc = self._proc_expm
self.proc = proc
self.postproc = postproc
[docs]
def compute_double_flatfield(self, radios, recompute=False):
"""
Read the radios and generate the "double flat field" by averaging
and possibly other processing.
Parameters
----------
radios: array
Input radios chunk.
recompute: bool, optional
Whether to recompute the double flatfield if already computed.
"""
if not (isinstance(radios, garray.GPUArray)): # pylint: disable=E0606
raise ValueError("Expected pycuda.gpuarray.GPUArray for radios")
if self._computed and not (recompute):
return self.doubleflatfield
acc = garray.zeros(radios[0].shape, "f")
tmpdat = garray.zeros(radios[0].shape, "f")
for i in range(radios.shape[0]):
self.proc(radios[i], tmpdat)
acc += tmpdat
acc /= radios.shape[0]
if self.sigma_filter is not None:
# acc = acc - gaussian_filter(acc, self.sigma_filter, mode=self.filter_mode)
self._unsharp_mask.unsharp(acc, tmpdat)
acc[:] = tmpdat[:]
self.postproc(acc, tmpdat)
self.doubleflatfield = tmpdat
# Handle small values to avoid issues when dividing
# self.doubleflatfield[np.abs(self.doubleflatfield) < self._small] = 1.
cumath.fabs(self.doubleflatfield, out=acc)
acc -= self._small # acc = abs(doubleflatfield) - _small
garray.if_positive(acc, self.doubleflatfield, garray.zeros_like(acc) + self._small, out=self.doubleflatfield)
if self.writer is not None:
self.writer.write(self.doubleflatfield.get(), "double_flatfield")
self._computed = True
return self.doubleflatfield
[docs]
def get_double_flatfield(self, radios=None, compute=False):
"""
Get the double flat field or a subregion of it.
Parameters
----------
radios: array, optional
Input radios chunk
compute: bool, optional
Whether to compute the double flatfield anyway even if a dump file
exists.
"""
if self.reader is None:
if radios is None:
raise ValueError("result_url was not provided. Please provide 'radios' to this function")
return self.compute_double_flatfield(radios)
if radios is not None and compute:
res = self.compute_double_flatfield(radios)
else:
res = self._load_dff_dump()
res = garray.to_gpu(res)
self._computed = True
return res
[docs]
def apply_double_flatfield(self, radios):
"""
Apply the "double flatfield" filter on a chunk of radios.
The processing is done in-place !
"""
check_shape(radios.shape, self.radios_shape, "radios")
dff = self.get_double_flatfield(radios=radios)
for i in range(self.n_angles):
radios[i] /= dff
return radios