Source code for tilepy.include.MapManagement.SkyMap

# Copyright (C) 2016-2025  tilepy developers
# (Monica Seglar-Arroyo, Halim Ashkar, Fabian Schussler, Mathieu de Bony)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#

##################################################################################################
#                            Skymap class                                                        #
##################################################################################################

import astropy.units as u
import healpy as hp
import numpy as np
from astropy.coordinates import SkyCoord
from scipy.stats import norm

from tilepy.include.PointingTools import Tools

__all__ = ["SkyMap"]


[docs] class SkyMap: """ Representation and utility methods for gravitational-wave localization sky maps. This class handles both 2D (probability density) and 3D (distance) sky maps, provides area and pixel selection utilities, manages rasterization and caching, and can compute probabilities for galaxies in a catalog. """ def __init__(self, obspar, mapReader): """ Initialize the SkyMap object. Parameters ---------- obspar : object Observation parameters. mapReader : object Interface for sky map data. """ self.raw_map_prob_density = mapReader.getMap("prob") if obspar.algorithm == "2D": self.is3D = False else: self.is3D = self.determine3D(obspar, mapReader) self.mode = getattr(obspar, "mode", None) if self.is3D: self.raw_map_dist_mean = mapReader.getMap("distMean") self.raw_map_dist_sigma = mapReader.getMap("distSigma") self.raw_map_dist_norm = mapReader.getMap("distNorm") self.minimumProbCutForGalaxyCatalog = obspar.minimumProbCutForCatalogue self.rasterized_map_cache = {} self.pix_id_area_cache = {}
[docs] def determine3D(self, obspar, mapReader): """ Decide whether the sky map should be treated as 3D. Checks map properties and observation parameters to set 3D usage. Parameters ---------- obspar : object Observation parameters. mapReader : object Map reading interface. Returns ------- is3D : bool True if 3D mode is used, False otherwise. """ is3D = True if mapReader.has3D: dist_mean, dist_std = mapReader.getDistance() if dist_mean + 2 * dist_std > obspar.distCut: is3D = False pix_maximum = np.argmax(self.raw_map_prob_density.data) lon, lat = self.raw_map_prob_density.pix2ang(pix_maximum, lonlat=True) coordinate = SkyCoord(ra=lon * u.deg, dec=lat * u.deg) if Tools.GalacticPlaneBorder(coordinate): is3D = False else: is3D = False return is3D
[docs] def getPixIdArea(self, fraction_localisation, nside=None, scheme="ring"): """ Return pixel indices covering a specified localization probability. Selects pixels, ordered by probability, until the given `fraction_localisation` of the total probability is included. Parameters ---------- fraction_localisation : float Probability threshold (e.g., 0.9 for 90% localization). nside : int or None, optional HEALPix nside to use for rasterization (default: None). scheme : str, optional HEALPix ordering scheme, either 'ring' (default) or 'nested'. Returns ------- pix_id : ndarray of int Pixel indices containing the requested probability fraction. """ cache_line = ( f"{fraction_localisation}_raw" if nside is None else f"{fraction_localisation}_{nside}_{scheme}" ) if cache_line in self.pix_id_area_cache.keys(): return self.pix_id_area_cache[cache_line] if nside is None: sorted_pixel_id = np.flipud(np.argsort(self.raw_map_prob_density.data)) prob_sorted = ( self.raw_map_prob_density[sorted_pixel_id] * self.raw_map_prob_density.pixarea(sorted_pixel_id) ).value else: prob = self.getMap("prob", nside=nside, scheme=scheme) sorted_pixel_id = np.flipud(np.argsort(prob)) prob_sorted = prob[sorted_pixel_id] summed_probability = np.cumsum(prob_sorted) self.pix_id_area_cache[cache_line] = sorted_pixel_id[ summed_probability <= fraction_localisation ] return self.pix_id_area_cache[cache_line]
[docs] def getArea(self, fraction_localisation): area_vals = self.raw_map_prob_density.pixarea( self.getPixIdArea(fraction_localisation) ) return np.sum(area_vals.to(u.deg * u.deg))
[docs] def getMap(self, mapType, nside, scheme="ring"): """ Get a rasterized map of the specified type and pixelization. Parameters ---------- mapType : str Type of map ('prob_density', 'prob', 'coordinate'). nside : int Desired HEALPix nside resolution. scheme : str, optional HEALPix ordering scheme: 'ring' (default) or 'nested'. Returns ------- The requested map: a pixel array for 'prob' or 'prob_density', or an array of sky coordinates for 'coordinate'. Raises ------ Exception If `mapType` is not recognized. """ cache_entry = mapType + "_" + str(nside) + "_" + scheme if cache_entry in self.rasterized_map_cache.keys(): return self.rasterized_map_cache[cache_entry] if mapType == "prob_density": self.rasterized_map_cache[cache_entry] = ( self.raw_map_prob_density.rasterize(nside=nside, scheme=scheme).data ) elif mapType == "prob": self.rasterized_map_cache[cache_entry] = self.getMap( "prob_density", nside, scheme ) * hp.nside2pixarea(nside) elif mapType == "coordinate": npix = hp.nside2npix(nside) id_pix = np.arange(npix) lon, lat = hp.pix2ang(nside, id_pix, nest=(scheme == "nested"), lonlat=True) self.rasterized_map_cache[cache_entry] = SkyCoord( ra=lon * u.deg, dec=lat * u.deg ) else: raise Exception("Unknown type of map") return self.rasterized_map_cache[cache_entry]
[docs] def getMaximumProbabilityCoordinates(self, obspar=None): """ Returns the sky coordinates (RA, Dec) of the highest-probability pixel in the raw probability density map. """ if obspar is not None and getattr(obspar, "mode", None) == "gaussian": maxCoord = SkyCoord( ra=obspar.raSource * u.deg, dec=obspar.decSource * u.deg ) else: ipix_max = np.argmax(self.raw_map_prob_density.data) lon, lat = self.raw_map_prob_density.pix2ang(ipix_max, lonlat=True) maxCoord = SkyCoord(ra=lon * u.deg, dec=lat * u.deg) return maxCoord
[docs] def computeGalaxyProbability(self, galaxyCatalog, mangrove=False): """ Compute localization probability for each galaxy in a catalog. Adds a column `dp_dV` to the input table with the probability density at each galaxy's position (and distance if in 3D). Parameters ---------- galaxyCatalog : astropy.table.Table or pandas.DataFrame Table of galaxies with 'RAJ2000', 'DEJ2000', and 'Dist' columns. mangrove : bool, optional Flag to use the mangrove method of weighting by the mass of the host galaxy. Returns ------- galaxyCatalog : same type as input Input table with additional 'dp_dV' probability column. """ # Identify the pixel associated to each galaxy ra = galaxyCatalog["RAJ2000"] dec = galaxyCatalog["DEJ2000"] dist = galaxyCatalog["Dist"] theta = 0.5 * np.pi - np.deg2rad(dec) phi = np.deg2rad(ra) pix_id = self.raw_map_prob_density.ang2pix(theta, phi) # Compute the probability associated to each galaxy if self.is3D: galaxyCatalog["dp_dV"] = ( self.raw_map_prob_density[pix_id] * self.raw_map_dist_norm[pix_id] * norm( self.raw_map_dist_mean[pix_id], self.raw_map_dist_sigma[pix_id] ).pdf(dist) * self.raw_map_prob_density.pixarea(pix_id) ) else: galaxyCatalog["dp_dV"] = self.raw_map_prob_density[ pix_id ] * self.raw_map_prob_density.pixarea(pix_id) return galaxyCatalog