Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions heracles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"cl2corr",
"corr2cl",
# unmixing
"natural_unmixing",
"naturalspice",
]

try:
Expand Down Expand Up @@ -157,5 +157,5 @@
)

from .unmixing import (
natural_unmixing,
naturalspice,
)
7 changes: 3 additions & 4 deletions heracles/dices/jackknife.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ..result import Result, get_result_array
from ..mapping import transform
from ..twopoint import angular_power_spectra
from ..unmixing import _natural_unmixing, logistic
from ..unmixing import real_naturalspice, logistic
from ..transforms import cl2corr

try:
Expand Down Expand Up @@ -62,7 +62,7 @@ def jackknife_cls(data_maps, vis_maps, jk_maps, fields, mask_correction="Fast",
# Mask correction
if mask_correction == "Full":
alphas = get_mask_correlation_ratio(_cls_mm, mls0)
_cls = _natural_unmixing(_cls, alphas, fields)
_cls = real_naturalspice(_cls, alphas, fields)
elif mask_correction == "Fast":
_cls = correct_footprint_reduction(_cls, jk_maps, fields, *regions)
else:
Expand Down Expand Up @@ -260,8 +260,7 @@ def get_mask_correlation_ratio(Mljk, Mls0):
wmljk = cl2corr(mljk)
wmljk = wmljk.T[0]
# Compute alpha
alpha = wmljk / wmls0
alpha *= logistic(np.log10(abs(wmljk)))
alpha = wmls0 / (wmljk * logistic(np.log10(abs(wmljk))))
alphas[key] = replace(Mls0[key], array=alpha)
return alphas

Expand Down
45 changes: 40 additions & 5 deletions heracles/twopoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,37 @@ def mixing_matrices(
return out


def svd_pinv(A, rtol=1e-2):
"""`
Compute the Moore–Penrose pseudoinverse of a matrix A
using its Singular Value Decomposition (SVD).

Parameters
----------
A : (m, n) array_like
Input matrix to be pseudoinverted.
rtol : float, optional
Cutoff for small singular values.
Singular values smaller than rtol * max(s) are set to zero.

Returns
-------
A_pinv : (n, m) ndarray
The pseudoinverse of A.
"""
# Step 1: Compute SVD
U, s, Vt = np.linalg.svd(A, full_matrices=False)

# Step 2: Invert singular values with tolerance
tol = rtol * np.max(s)
s_pinv = np.array([1 / si if si > tol else 0 for si in s])

# Step 3: Reconstruct pseudoinverse
# A^+ = V * Σ^+ * U^T
A_pinv = (Vt.T * s_pinv) @ U.T
return A_pinv, s_pinv


def invert_mixing_matrix(
M,
rtol: float = 1e-5,
Expand All @@ -417,11 +448,13 @@ def invert_mixing_matrix(

Returns:
inv_M: inverted Cls in the same mapping form
inv_s: singular values of the mixing matrices
"""
if progress is None:
progress = NoProgress()

inv_M = {}
inv_s = {}
current, total = 0, len(M)

for key, value in M.items():
Expand All @@ -438,8 +471,8 @@ def invert_mixing_matrix(
# makes the mixing matrix block-diagonal
M_p = _M[0] + _M[1]
M_m = _M[0] - _M[1]
inv_M_p = np.linalg.pinv(M_p, rcond=rtol)
inv_M_m = np.linalg.pinv(M_m, rcond=rtol)
inv_M_p, inv_s_p = svd_pinv(M_p, rtol=rtol)
inv_M_m, inv_s_m = svd_pinv(M_m, rtol=rtol)
_inv_m = np.vstack(
(
np.hstack(((inv_M_p + inv_M_m) / 2, (inv_M_p - inv_M_m) / 2)),
Expand All @@ -448,13 +481,15 @@ def invert_mixing_matrix(
)
_inv_M_EEEE = _inv_m[:_m, :_n]
_inv_M_EEBB = _inv_m[_m:, :_n]
_inv_M_EBEB = np.linalg.pinv(_M[2], rcond=rtol)
_inv_M_EBEB, inv_s_eb = svd_pinv(_M[2], rtol=rtol)
_inv_M = np.array([_inv_M_EEEE, _inv_M_EEBB, _inv_M_EBEB])
_inv_s = np.array([inv_s_p, inv_s_m, inv_s_eb])
else:
_inv_M = np.linalg.pinv(_M, rcond=rtol)
_inv_M, _inv_s = svd_pinv(_M, rtol=rtol)

inv_M[key] = replace(M[key], array=_inv_M)
return inv_M
inv_s[key] = replace(M[key], array=_inv_s)
return inv_M, inv_s


def apply_mixing_matrix(d, M, lmax=None):
Expand Down
68 changes: 48 additions & 20 deletions heracles/unmixing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see <https://www.gnu.org/licenses/>.
import numpy as np
from heracles.twopoint import mixing_matrices, invert_mixing_matrix, apply_mixing_matrix
from .result import truncated
from .transforms import cl2corr, corr2cl
from .utils import get_cl
Expand All @@ -28,31 +29,58 @@
from dataclasses import replace


def natural_unmixing(d, m, fields, x0=-2, k=50, patch_hole=True, lmax=None):
def naturalspice(d, m, fields, mode="Harmonic", rtol=0.0, lmax=None):
"""
Natural unmixing of the data Cl.
Args:
d: Data Cl
m: mask Cl
fields: list of fields
patch_hole: If True, apply the patch hole correction
rtol: Relative tolerance for logistic correction
Returns:
corr_d: Corrected Cl
"""
wm = {}
m_keys = list(m.keys())
for m_key in m_keys:
_m = m[m_key].array
_wm = cl2corr(_m).T[0]
if patch_hole:
_wm *= logistic(np.log10(abs(_wm)), x0=x0, k=k)
wm[m_key] = replace(m[m_key], array=_wm)
return _natural_unmixing(d, wm, fields, lmax=lmax)
# Step 1: Transform.
# In harmonic space this means computing the singular values
# of the mixing matrix.
# In real space this means computing the correlation function.
if mode == "Harmonic":
M = mixing_matrices(fields, m)

elif mode == "Real":
wm = {}
m_keys = list(m.keys())
for m_key in m_keys:
_m = m[m_key].array
_wm = cl2corr(_m).T[0]
wm[m_key] = replace(m[m_key], array=_wm)

# Step 2: Regularize
if mode == "Harmonic":
iM, inv_s = invert_mixing_matrix(M, rtol=rtol)
elif mode == "Real":
inv_s = {}
m_keys = list(m.keys())
for m_key in m_keys:
_m = m[m_key].array
# Apply logistic correction
tol = rtol * np.max(np.abs(wm))
_wm *= logistic(np.log10(abs(_wm)), tol=np.log10(tol))
inv_s[m_key] = replace(m[m_key], array=1 / _wm)

# Step 3: Apply correction
if mode == "Harmonic":
corr_d = apply_mixing_matrix(d, iM)
elif mode == "Real":
corr_d = real_naturalspice(d, inv_s, fields, lmax=lmax)
return corr_d, inv_s

def _natural_unmixing(d, wm, fields, lmax=None):

def real_naturalspice(d, inv_wm, fields, lmax=None):
"""
Natural unmixing of the data Cl.
Computes the correlation function of the data Cl,
divides multiplies it by the provided inverse mask correlation function,
and transforms back to Cl.
Args:
d: Data Cl
wm: mask correlation function
Expand All @@ -70,12 +98,12 @@ def _natural_unmixing(d, wm, fields, lmax=None):
for key in d.keys():
a, b, i, j = key
m_key = (masks[a], masks[b], i, j)
_wm = get_cl(m_key, wm)
_inv_wm = get_cl(m_key, inv_wm)
_d = d[key]
s1, s2 = _d.spin
if lmax is None:
*_, lmax = _d.shape
lmax_mask = len(_wm.array)
lmax_mask = len(_inv_wm.array)
# Grab metadata
dtype = _d.array.dtype
# pad cls
Expand All @@ -102,8 +130,8 @@ def _natural_unmixing(d, wm, fields, lmax=None):
)
# Correct by alpha
wd = cl2corr(__d.T).T + 1j * cl2corr(__id.T).T
corr_wd = (wd / _wm).real
icorr_wd = (wd / _wm).imag
corr_wd = (wd * _inv_wm).real
icorr_wd = (wd * _inv_wm).imag
# Transform back to Cl
__corr_d = corr2cl(corr_wd.T).T
__icorr_d = corr2cl(icorr_wd.T).T
Expand All @@ -118,7 +146,7 @@ def _natural_unmixing(d, wm, fields, lmax=None):
_corr_d = []
for cl in _d:
wd = cl2corr(cl).T
corr_wd = wd / _wm
corr_wd = wd * _inv_wm
# Transform back to Cl
__corr_d = corr2cl(corr_wd.T).T
_corr_d.append(__corr_d[0])
Expand All @@ -132,5 +160,5 @@ def _natural_unmixing(d, wm, fields, lmax=None):
return corr_d


def logistic(x, x0=-5, k=50):
return 1.0 + np.exp(-k * (x - x0))
def logistic(x, tol=-5, k=50):
return 1.0 + np.exp(-k * (x - tol))
2 changes: 1 addition & 1 deletion tests/test_dices.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_get_delete2_fsky(jk_maps, njk):

def test_full_mask_correction(cls0, mls0, fields):
alphas = dices.get_mask_correlation_ratio(mls0, mls0)
_cls = heracles.unmixing._natural_unmixing(cls0, alphas, fields)
_cls = heracles.unmixing.real_naturalspice(cls0, alphas, fields)
for key in list(cls0.keys()):
cl = cls0[key].array
_cl = _cls[key].array
Expand Down
10 changes: 8 additions & 2 deletions tests/test_twopoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,34 +368,40 @@ def test_inverting_mixing_matrices():
("WHT", "WHT", 0, 0): Result(cl, spin=(0, 0), axis=(0,)),
}
mms = mixing_matrices(fields, cls2, l1max=10, l2max=20)
inv_mms = invert_mixing_matrix(mms)
inv_mms, inv_s = invert_mixing_matrix(mms)

# test for correct shape
for key in mms.keys():
*_, n, m = mms[key].shape
*_, _n, _m = inv_mms[key].shape
*_, __n = inv_s[key].shape
s1, s2 = mms[key].spin
_s1, _s2 = inv_mms[key].spin
assert s1 == _s1
assert s2 == _s2
assert n == _m
assert m == _n
assert n == __n

# test that the inverse is correct
mms = mixing_matrices(fields, cls2)
for key in mms:
_m = np.ones_like(mms[key].array)
mms[key] = Result(_m, spin=mms[key].spin, axis=mms[key].axis, ell=mms[key].ell)

inv_mms = invert_mixing_matrix(mms)
inv_mms, inv_s = invert_mixing_matrix(mms)
assert inv_mms.keys() == mms.keys()
for key in mms:
inv_mm = inv_mms[key].array
if inv_mm.ndim == 3:
_inv_m = np.sum(inv_mm)
np.testing.assert_allclose(
inv_s[key].array.T[0], [1 / (2 * (lmax + 1)), 0.0, 1 / (lmax + 1)]
)
np.testing.assert_allclose(_inv_m, 1.5)
else:
_inv_m = np.sum(inv_mm)
np.isclose(inv_s[key][0], 1 / (lmax + 1))
np.testing.assert_allclose(_inv_m, 1.0)

# test application of mixing matrices
Expand Down