from typing import Union
import numpy as np
import dask
from dask.diagnostics import ProgressBar
import dask.array as da
from tqdm import tqdm
from .utils import is_gpu_available
try:
import cupy as cp
except ImportError:
pass
[docs]def ddm(
data: Union[dask.array.core.Array, np.ndarray],
taus: np.ndarray = np.arange(0),
bulk: bool = False,
) -> np.ndarray:
"""_summary_
Parameters
----------
data : Union[dask.array.core.Array, np.ndarray]
taus : np.ndarray, optional
array of lag times (in frames), by default half of number of frames
bulk : bool, optional
Call dask.compute on entire tau range for GPU processing, by default False
Returns
-------
np.ndarray
Raises
------
TypeError
Data type is not supported. Supported types are np.ndarray and dask.array.core.Array.
"""
# Check lag time range
taus = np.arange(1, len(data) // 2) if taus.size == 0 else taus
if taus[0] == 0:
raise ValueError("Cannot calculate 0 lag time, please start range at 1")
data_type = data.data
if isinstance(data_type, np.ndarray):
return ddm_numpy(data, taus)
elif isinstance(data_type, dask.array.core.Array):
if is_gpu_available:
try:
import cupy as cp
print("Running analysis on GPU")
return ddm_dask_gpu(data, taus, bulk)
except ImportError:
print("Running analysis on CPU")
return ddm_dask_cpu(data, taus)
else:
print("Running analysis on CPU")
return ddm_dask_cpu(data, taus)
else:
raise (TypeError, f"Data of type {data_type} is not supported")
[docs]def ddm_numpy(data, taus: np.ndarray):
"""_summary_
Parameters
----------
data : np.array
image stack
Returns
-------
np.array
ddm matrix
"""
num_frames, height, width = data.shape
results = []
img_fft = np.fft.fft2(data).astype(np.complex64)
for tau in taus:
result = dask.delayed(calc_matrix)(img_fft, tau, num_frames, height, width)
results.append(result)
with ProgressBar():
out = dask.compute(*results)
return np.asarray(out)
[docs]def ddm_dask_cpu(data, taus: np.ndarray):
"""_summary_
Parameters
----------
data : _type_
_description_
taus : _type_, optional
_description_, by default np.arange(0)
Returns
-------
_type_
_description_
"""
num_frames, _, _ = data.shape
results = []
fft_data = da.fft.fft2(data.data).astype(np.complex64)
for tau in taus:
result = calc_matrix_dask(fft_data, tau)
results.append(result)
fft_shift = dask.compute(*results)
out = []
for row, tau in zip(fft_shift, taus):
out.append(calc_radial(row, num_frames, tau))
return np.asarray(out)
[docs]def ddm_dask_gpu(data, taus: np.ndarray = np.arange(0), bulk: bool = False):
"""_summary_
Parameters
----------
data : _type_
_description_
taus : _type_, optional
_description_, by default np.arange(0)
bulk : bool, optional
Call dask.compute on entire tau range, by default False
Returns
-------
_type_
_description_
"""
taus = np.arange(1, len(data) // 2) if taus.size == 0 else taus
num_frames, width, height = data.shape
results = []
chunk_size = (data.chunks[0][0], width, height)
data_gpu = da.from_array(cp.asarray(data.data), chunks=chunk_size, asarray=False)
fft_data = da.fft.fft2(data_gpu).astype(cp.complex64)
del data_gpu
if not bulk: # compute taus in separate steps
for tau in tqdm(taus):
result = calc_matrix_dask(fft_data, tau)
out = dask.compute(result, scheduler="single-threaded")
results.append(cp.asnumpy(out[0]))
del result, out
cp._default_memory_pool.free_all_blocks()
del fft_data
cp._default_memory_pool.free_all_blocks()
out = []
for data, tau in zip(results, taus):
out.append(calc_radial(data, num_frames, tau))
else: # compute all taus in one step
for tau in taus:
result = calc_matrix_dask(fft_data, tau)
results.append(result)
del result
cp._default_memory_pool.free_all_blocks()
out = dask.compute(*results, scheduler="single-threaded")
del fft_data, results
cp._default_memory_pool.free_all_blocks()
fft_shift = np.asarray([cp.asnumpy(x) for x in out])
# return fft_shift
out = []
for data, tau in zip(fft_shift, taus):
out.append(calc_radial(data, num_frames, tau))
return np.asarray(out)
[docs]def calc_matrix(img_fft, tau, num_frames, height, width):
"""_summary_
Parameters
----------
img_fft : np.array
_description_
tau : _type_
_description_
num_frames : _type_
_description_
height : _type_
_description_
width : _type_
_description_
Returns
-------
_type_
_description_
"""
img_diff = img_fft[:-tau, :, :] - img_fft[tau:, :, :]
img_fft_sq = np.abs(img_diff) ** 2
img_sum = np.sum(img_fft_sq, axis=0)
fft_shift = np.fft.fftshift(img_sum)
gTau = fft_shift / (num_frames - tau)
gTauRadial = radial_profile(gTau, (width / 2.0, height / 2.0))
return gTauRadial
[docs]def calc_matrix_dask(img_fft, tau):
"""_summary_
Parameters
----------
img_fft : np.array
_description_
tau : _type_
_description_
Returns
-------
_type_
_description_
"""
img_diff = img_fft[:-tau, :, :] - img_fft[tau:, :, :]
img_fft_sq = da.abs(img_diff) ** 2
img_sum = da.sum(img_fft_sq, axis=0)
fft_shift = da.fft.fftshift(img_sum)
return fft_shift
[docs]def calc_radial(fft_shift: np.ndarray, num_frames, tau):
"""_summary_
Parameters
----------
fft_shift : _type_
_description_
num_frames : _type_
_description_
tau : _type_
_description_
Returns
-------
_type_
_description_
"""
width, height = fft_shift.shape
gTau = fft_shift / (num_frames - tau)
gTauRadial = radial_profile(gTau, (width / 2.0, height / 2.0))
return gTauRadial
[docs]def calc_radial_gpu(fft_shift: np.ndarray, num_frames, tau):
"""_summary_
Parameters
----------
fft_shift : _type_
_description_
num_frames : _type_
_description_
tau : _type_
_description_
Returns
-------
_type_
_description_
"""
width, height = fft_shift.shape
gTau = fft_shift / (num_frames - tau)
gTauRadial = radial_profile_gpu(gTau, (width / 2.0, height / 2.0))
return gTauRadial
[docs]def radial_profile(data: np.ndarray, centre: tuple):
"""_summary_
Parameters
----------
data : np.ndarray
_description_
centre : tuple
_description_
Returns
-------
_type_
_description_
"""
x, y = np.indices((data.shape))
r = np.sqrt((x - centre[0]) ** 2 + (y - centre[1]) ** 2)
r = r.astype(np.int)
tbin = np.bincount(r.ravel(), data.ravel())
nr = np.bincount(r.ravel())
radialprofile = tbin / nr
return radialprofile
[docs]def radial_profile_gpu(data: np.ndarray, centre: tuple):
"""_summary_
Parameters
----------
data : np.ndarray
_description_
centre : tuple
_description_
Returns
-------
_type_
_description_
"""
x, y = cp.indices((data.shape))
r = cp.sqrt((x - centre[0]) ** 2 + (y - centre[1]) ** 2)
r = r.astype(cp.int)
tbin = cp.bincount(r.ravel(), data.ravel())
nr = cp.bincount(r.ravel())
radialprofile = tbin / nr
return radialprofile