Source code for tilepy.include.TilingDetermination

# Copyright (C) 2016-2025  tilepy developers
# (Monica Seglar-Arroyo, Halim Ashkar, Fabian Schussler, Mathieu de Bony)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
##################################################################################################
#       Mid-level functions use to obtain the tiling observation scheduling
##################################################################################################


import datetime
import logging
import random

import astropy.coordinates as co
import healpy as hp
import numpy as np
import pytz
import six
from astropy import units as u
from astropy.table import Table
from six.moves import configparser

from .MaskingTools import (
    FulfillsRequirement,
    FulfillsRequirementGreyObservations,
    GetBestGridPos2D,
    GetBestGridPos3D,
    OccultationCut,
    SAA_Times,
    VisibleAtTime,
    ZenithAngleCut,
)
from .PointingTools import (
    ComputeProbability2D,
    ComputeProbGalTargeted,
    ComputeProbPGALIntegrateFoV,
    FilterGalaxies,
    FindMatchingCoords,
    FindMatchingPixList,
    GetBestNSIDE,
    GetRegionPixReduced,
    GetSatelliteName,
    GetSatellitePositions,
    GetSatelliteTime,
    LoadGalaxies,
    LoadGalaxies_SteMgal,
    MangroveGalaxiesProbabilities,
    ModifyCatalogue,
    NextWindowTools,
    NightDarkObservation,
    NightDarkObservationwithGreyTime,
    SubstractPointings,
    SubstractPointings2D,
    Tools,
    TransformPixToRaDec,
    TransformRADecToPix,
)

if six.PY2:
    ConfigParser = configparser.SafeConfigParser
else:
    ConfigParser = configparser.ConfigParser


utc = pytz.UTC
############################################

#              General definitions              #

############################################

__all__ = [
    "PGWinFoV",
    "PGalinFoV",
    "ObservationStartperObs",
    "PGWinFoV_NObs",
    "PGalinFoV_NObs",
    "PGWinFoV_Space_NObs",
]

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)


[docs] def PGWinFoV(skymap, nameEvent, obspar, dirName): """ Mid-level function that is called by GetSchedule to compute a observation schedule based on a 2D method. Parameters ---------- skymap : SkyMap The object containing the sky maps. nameEvent : str The name of the event. obspar : ObservationParameters Observation parameters, including: - obsTime (datetime): Desired time for scheduling to start. - pointingsFile (str): Path to file with previous pointings. - ... (other parameters as needed) Class containing the telescope configuration parameters. dirName : str Output directory for schedules and plots. Returns ------- SuggestedPointings : astropy.table.Table Table of scheduled pointings. ObservationTime0 : str the desired time for scheduling to start. """ ObservationTime0 = obspar.obsTime PointingFile = obspar.pointingsFile # Main parameters logger.info(f"Obspar:\n{obspar}") random.seed() RAarray = [] DECarray = [] pixlist = [] pixlistHR = [] pixlist1 = [] pixlistHR1 = [] P_GWarray = [] ObservationTimearray = [] Round = [] Fov_obs = [] ObsName = [] Duration = [] logger.info("\n------------------- NEW EVENT --------------------\n") # Retrieve maps prob = skymap.getMap("prob", obspar.reducedNside) highres = skymap.getMap("prob", obspar.HRnside) # Create table for 2D probability at 90% containment rapix, decpix, areapix = GetRegionPixReduced( prob, obspar.percentageMOC, obspar.reducedNside ) radecs = co.SkyCoord(rapix, decpix, frame="icrs", unit=(u.deg, u.deg)) # Add observed pixels to pixlist maxRuns = obspar.maxRuns if PointingFile is not None: logger.info( "===========================================================================================" ) pixlist, pixlistHR, sumPGW, doneObs = SubstractPointings2D( PointingFile, prob, obspar, pixlist, pixlistHR ) if obspar.countPrevious: maxRuns = obspar.maxRuns - doneObs logger.info(f"Total GW probability already covered: {sumPGW}") logger.info( f"Count Previous = {obspar.countPrevious}, Number of pointings already done: {doneObs}, " f"Max Runs was {obspar.maxRuns}, now is {maxRuns}" ) logger.info( "===========================================================================================" ) ####################################################### logger.info("---------- NEW FOLLOW-UP ATTEMPT ----------") if obspar.useGreytime: NightDarkRuns = NightDarkObservationwithGreyTime(ObservationTime0, obspar) else: NightDarkRuns = NightDarkObservation(ObservationTime0, obspar) counter = 0 for j, NightDarkRun in enumerate(NightDarkRuns): if len(ObservationTimearray) < maxRuns: ObservationTime = NightDarkRun ObsBool, yprob = ZenithAngleCut(prob, ObservationTime, obspar) if ObsBool: # Round 1 P_GW, TC, pixlist, pixlistHR = ComputeProbability2D( obspar, prob, highres, radecs, ObservationTime, pixlist, pixlistHR, counter, dirName, ) if (P_GW <= obspar.minProbcut) and obspar.secondRound: # Try Round 2 # print('The minimum probability cut being', minProbcut * 100, '% is, unfortunately, not reached.') yprob1 = highres P_GW, TC, pixlist1, pixlistHR1 = ComputeProbability2D( obspar, prob, yprob1, radecs, ObservationTime, pixlist1, pixlistHR1, counter, dirName, ) if P_GW <= obspar.minProbcut: logger.info( "Tile Pgw= {P_GW} is smaller than the minProbCut ({obspar.minProbcut}) => skip this tile" ) else: Round.append(2) P_GWarray.append(float("{:1.4f}".format(float(P_GW)))) RAarray.append(float("{:3.4f}".format(float(TC.ra.deg)))) DECarray.append(float("{:3.4f}".format(float(TC.dec.deg)))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) counter = counter + 1 elif P_GW >= obspar.minProbcut: Round.append(1) P_GWarray.append(float("{:1.4f}".format(float(P_GW)))) RAarray.append(float("{:3.4f}".format(float(TC.ra.deg)))) DECarray.append(float("{:3.4f}".format(float(TC.dec.deg)))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) counter = counter + 1 else: break logger.info( f"\nTotal GW probability covered: {float(sum(P_GWarray)):1.4f}\nNumber of runs that fulfill darkness condition: {len(NightDarkRuns)}\nNumber of effective pointings: {len(ObservationTimearray)}" ) # List of suggested pointings SuggestedPointings = Table( [ ObservationTimearray, RAarray, DECarray, P_GWarray, Round, ObsName, Duration, Fov_obs, ], names=[ "Time[UTC]", "RA[deg]", "DEC[deg]", "PGW", "Round", "ObsName", "Duration", "FoV", ], ) if len(SuggestedPointings) != 0: logger.info( "\n================================= Tiling found =============================================" ) logger.info(SuggestedPointings) logger.info( "============================================================================================\n" ) logger.info(f"The total probability PGW: {np.sum(P_GWarray):.4f}") return (SuggestedPointings, ObservationTime0)
[docs] def PGalinFoV(skymap, nameEvent, galFile, obspar, dirName): """ Compute an observation schedule based on a 3D (galaxy-weighted) method. This mid-level function is called by :func:`tilepy.include.observationschedule.GetSchedule` and produces a suggested schedule of pointings for the given observatory, using either the target galaxy strategy or the integrated galaxy probability strategy, depending on user input and the telescope field of view. Parameters ---------- skymap : SkyMap The object storing sky maps. nameEvent : str The name of the event. galFile : str Path to the galaxy catalog. obspar : ObservationParameters Telescope configuration parameters used in the scheduling. dirName : str Path to the output directory where the schedules and plots will be saved. Returns ------- SuggestedPointings : astropy.table.Table Table of suggested pointings (with time, coordinates, probability, etc.). tGals0 : astropy.table.Table Filtered and ranked list of galaxies for scheduling. """ # The desired time for scheduling to start ObservationTime0 = obspar.obsTime # The path to the text file containing the pointings that have already been performed before the scheduling PointingFile = obspar.pointingsFile # Main Parameters logger.info(f"Obspar:\n{obspar}") # load galaxy catalog from local file if not obspar.mangrove: cat = LoadGalaxies(galFile) else: cat = LoadGalaxies_SteMgal(galFile) nside = obspar.HRnside prob = skymap.getMap("prob", obspar.HRnside) if skymap.is3D: logger.info("Skymap is 3D") else: logger.info("Skymap is 2D") # correlate GW map with galaxy catalog, retrieve ordered list if not obspar.mangrove: cat = skymap.computeGalaxyProbability(cat) tGals0 = FilterGalaxies(cat, obspar.minimumProbCutForCatalogue) sum_dP_dV = cat["dp_dV"].sum() else: cat = skymap.computeGalaxyProbability(cat) tGals0 = FilterGalaxies(cat, obspar.minimumProbCutForCatalogue) tGals0 = MangroveGalaxiesProbabilities(tGals0) sum_dP_dV = cat["dp_dV"].sum() alreadysumipixarray1 = [] alreadysumipixarray2 = [] ######################### maxRuns = obspar.maxRuns if PointingFile is None: tGals = tGals0 logger.info("No pointings were given to be subtracted") else: ( ra, dec, tGals, AlreadyObservedPgw, AlreadyObservedPgal, alreadysumipixarray1, doneObs, ) = SubstractPointings( PointingFile, tGals0, alreadysumipixarray1, sum_dP_dV, prob, obspar, nside ) sumPGW = sum(AlreadyObservedPgw) sumPGAL = sum(AlreadyObservedPgal) # for second round # ra, dec, tGals, AlreadyObservedPgw, AlreadyObservedPgal,alreadysumipixarray2, doneObs = SubstractPointings(PointingFile, tGals0,alreadysumipixarray1,sum_dP_dV,prob, obspar, nside) if obspar.countPrevious: maxRuns = obspar.maxRuns - doneObs logger.info( "===========================================================================================" ) logger.info( f"Total GW probability already covered: {sumPGW}, " f"Total Gal probability already covered: {sumPGAL}" ) logger.info( f"Count Previous = {obspar.countPrevious}, Number of pointings already done: {doneObs}, " f"Max Runs was {obspar.maxRuns}, now is {maxRuns}" ) logger.info( "===========================================================================================" ) ########################## tGals_aux = tGals tGals_aux2 = tGals P_GALarray = [] P_GWarray = [] ObservationTimearray = [] RAarray = [] DECarray = [] ObsName = [] Duration = [] Fov_obs = [] Round = [] logger.info("---------- NEW FOLLOW-UP ATTEMPT ----------") logger.info( f"maxRuns: {maxRuns}, MinimumProbCutForCatalogue: {obspar.minimumProbCutForCatalogue}" ) if obspar.useGreytime: NightDarkRuns = NightDarkObservationwithGreyTime(ObservationTime0, obspar) else: NightDarkRuns = NightDarkObservation(ObservationTime0, obspar) counter = 0 if obspar.strategy == "integrated": for j, NightDarkRun in enumerate(NightDarkRuns): if len(ObservationTimearray) < maxRuns: ObservationTime = NightDarkRun visible, altaz, tGals_aux = VisibleAtTime( ObservationTime, tGals_aux, obspar ) if visible: # select galaxies within the slightly enlarged visibility window visiMask = altaz.alt.value > 90 - (obspar.maxZenith + obspar.FOV) visiGals = tGals_aux[visiMask] visiGals = ModifyCatalogue( prob, visiGals, obspar.FOV, sum_dP_dV, nside ) mask, minz = FulfillsRequirement(visiGals, obspar, UsePix=False) if obspar.useGreytime: maskgrey = FulfillsRequirementGreyObservations( ObservationTime, visiGals, obspar ) finalGals = visiGals[mask & maskgrey] if not obspar.useGreytime: finalGals = visiGals[mask] if not len(finalGals["dp_dV_FOV"]): logger.info( f"Condition not met at {ObservationTime}: no surviving galaxies." ) continue if finalGals["dp_dV_FOV"][0] > obspar.minProbcut: # final galaxies within the FoV # print(f"Condition met at {ObservationTime}: dp/dV_FOV = {finalGals['dp_dV_FOV'][0]} is greater than {obspar.minProbcut}") if ( (finalGals["dp_dV_FOV"][0] < (2 * obspar.minProbcut)) and (sum(P_GWarray) > 0.40) and obspar.secondRound ): visible, altaz, tGals_aux2 = VisibleAtTime( ObservationTime, tGals_aux2, obspar ) if visible: visiMask = altaz.alt.value > 90 - ( obspar.maxZenith + obspar.FOV ) visiGals2 = tGals_aux2[visiMask] visiGals2 = ModifyCatalogue( prob, visiGals2, obspar.FOV, sum_dP_dV, nside ) mask, minz = FulfillsRequirement( visiGals2, obspar, UsePix=False ) if obspar.useGreytime: maskgrey = FulfillsRequirementGreyObservations( ObservationTime, visiGals2, obspar ) finalGals2 = visiGals2[mask & maskgrey] if not obspar.useGreytime: finalGals2 = visiGals2[mask] p_gal, p_gw, tGals_aux2, alreadysumipixarray2 = ( ComputeProbPGALIntegrateFoV( prob, ObservationTime, obspar.location, finalGals2, False, visiGals2, tGals_aux2, sum_dP_dV, alreadysumipixarray2, nside, minz, obspar, counter, nameEvent, dirName, obspar.doPlot, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals2["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals2["DEJ2000"][:1]) ) ) ) Round.append(2) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) counter = counter + 1 else: p_gal, p_gw, tGals_aux, alreadysumipixarray1 = ( ComputeProbPGALIntegrateFoV( prob, ObservationTime, obspar.location, finalGals, False, visiGals, tGals_aux, sum_dP_dV, alreadysumipixarray1, nside, minz, obspar, counter, nameEvent, dirName, obspar.doPlot, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals["DEJ2000"][:1]) ) ) ) Round.append(1) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) counter = counter + 1 else: # print("We are in round 1") # print("\n=================================") # print("TARGET COORDINATES AND DETAILS...") # print("=================================") p_gal, p_gw, tGals_aux, alreadysumipixarray1 = ( ComputeProbPGALIntegrateFoV( prob, ObservationTime, obspar.location, finalGals, False, visiGals, tGals_aux, sum_dP_dV, alreadysumipixarray1, nside, minz, obspar, counter, nameEvent, dirName, obspar.doPlot, ) ) RAarray.append( float("{:3.4f}".format(float(finalGals["RAJ2000"][:1]))) ) DECarray.append( float("{:3.4f}".format(float(finalGals["DEJ2000"][:1]))) ) Round.append(1) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) counter = counter + 1 else: logger.info( f"Condition not met at {ObservationTime}: dp/dV_FOV = {finalGals['dp_dV_FOV'][0]} must be greater than {obspar.minProbcut}" ) else: break if obspar.strategy == "targeted": for j, NightDarkRun in enumerate(NightDarkRuns): if len(ObservationTimearray) < maxRuns: ObservationTime = NightDarkRun visible, altaz, tGals_aux = VisibleAtTime( ObservationTime, tGals_aux, obspar ) if visible: # select galaxies within the slightly enlarged visibility window visiMask = altaz.alt.value > 90 - (obspar.maxZenith + obspar.FOV) visiGals = tGals_aux[visiMask] mask, minz = FulfillsRequirement(visiGals, obspar, UsePix=False) if obspar.useGreytime: maskgrey = FulfillsRequirementGreyObservations( ObservationTime, visiGals, obspar ) finalGals = visiGals[mask & maskgrey] if not obspar.useGreytime: finalGals = visiGals[mask] # print('finalGals', finalGals,tGals['dp_dV'][:1]*obspar.minProbcut) if finalGals["dp_dV"][0] > tGals["dp_dV"][0] * obspar.minProbcut: # print(f"Condition met at {ObservationTime}: dp/dV_FOV = {finalGals['dp_dV'][0]} is greater than {obspar.minProbcut}") if ( (finalGals["dp_dV"][0] < (2 * obspar.minProbcut)) and (sum(P_GWarray) > 0.40) and obspar.secondRound ): visible, altaz, tGals_aux2 = VisibleAtTime( ObservationTime, tGals_aux2, obspar ) if visible: visiMask = altaz.alt.value > 90 - ( obspar.maxZenith + obspar.FOV ) visiGals2 = tGals_aux2[visiMask] mask, minz = FulfillsRequirement( visiGals2, obspar, UsePix=False ) if obspar.useGreytime: maskgrey = FulfillsRequirementGreyObservations( ObservationTime, visiGals2, obspar ) finalGals2 = visiGals2[mask & maskgrey] if not obspar.useGreytime: finalGals2 = visiGals2[mask] p_gal, p_gw, tGals_aux2, alreadysumipixarray2 = ( ComputeProbGalTargeted( prob, ObservationTime, finalGals2, visiGals2, tGals_aux2, sum_dP_dV, alreadysumipixarray2, nside, minz, obspar, counter, dirName, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals2["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals2["DEJ2000"][:1]) ) ) ) Round.append(2) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) counter = counter + 1 else: p_gal, p_gw, tGals_aux, alreadysumipixarray1 = ( ComputeProbGalTargeted( prob, ObservationTime, finalGals, visiGals, tGals_aux, sum_dP_dV, alreadysumipixarray1, nside, minz, obspar, counter, dirName, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals["DEJ2000"][:1]) ) ) ) Round.append(1) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) counter = counter + 1 else: # print("We are in round 1") p_gal, p_gw, tGals_aux, alreadysumipixarray1 = ( ComputeProbGalTargeted( prob, ObservationTime, finalGals, visiGals, tGals_aux, sum_dP_dV, alreadysumipixarray1, nside, minz, obspar, counter, dirName, ) ) RAarray.append( float("{:3.4f}".format(float(finalGals["RAJ2000"][:1]))) ) DECarray.append( float("{:3.4f}".format(float(finalGals["DEJ2000"][:1]))) ) Round.append(1) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) counter = counter + 1 else: logger.info( f"Condition not met at {ObservationTime}: dp/dV = {finalGals['dp_dV'][0]} must be greater than {obspar.minProbcut}" ) else: break # List of suggested pointings SuggestedPointings = Table( [ ObservationTimearray, RAarray, DECarray, P_GWarray, P_GALarray, Round, ObsName, Duration, Fov_obs, ], names=[ "Time[UTC]", "RA[deg]", "DEC[deg]", "PGW", "Pgal", "Round", "ObsName", "Duration", "FoV", ], ) if len(SuggestedPointings) != 0: logger.info( "\n================================= Tiling found =============================================" ) logger.info(SuggestedPointings) logger.info( "============================================================================================\n" ) logger.info(f"The total probability PGal: {np.sum(P_GALarray):.4f}") logger.info(f"The total probability PGW: {np.sum(P_GWarray):.4f}") return SuggestedPointings, tGals0
[docs] def ObservationStartperObs(obsparameters, ObservationTime0): """ Compute the first observation time for each observatory involved in the scheduling. This mid-level function is called by Nobs Tiling functions to determine the first available observation time for each observatory and whether each observatory can observe on the same night. Parameters ---------- obsparameters : list of ObservationParameters A list of sets of parameters for each observatory needed to launch the tiling scheduler. ObservationTime0 : sr The desired start time for scheduling to begin. Returns ------- obs_time : datetime The current observation time, possibly adjusted. SameNight : numpy.ndarray of bool An array indicating whether each observatory is available for observation on the same night. NewActiveObs : list of ObservationParameters A list of observatories that are available to observe. NewActiveObsStart : numpy.ndarray of datetime A sorted list of the first available observation times for each observatory. """ logger.info(f"ObservationTime0: {ObservationTime0}") logger.info(f"obsparameters: {len(obsparameters)}") # Finding the start time for each observatory and checking if it's now FirstDark = np.full(len(obsparameters), False, dtype=bool) FirstDark_Flag = np.full(len(obsparameters), False, dtype=bool) # print(len(ObsFirstTime)) obs_time = ObservationTime0 if obs_time.tzinfo is None: obs_time = utc.localize(obs_time) ObsFirstTime = [] j = 0 for obspar1 in obsparameters: if obsparameters[j].base == "space": dark_at_start = True FirstDark[j] = dark_at_start else: dark_at_start = False if obsparameters[j].useGreytime: dark_at_start = Tools.CheckWindowGrey(obs_time, obsparameters[j]) if not obsparameters[j].useGreytime: dark_at_start = Tools.CheckWindow(obs_time, obsparameters[j]) FirstDark[j] = dark_at_start # THIS WILL CREATE A DATETIME OBJECT WITH IN THE FORM XX+00:00 WITH NO DOTS if FirstDark[j]: FirstDark_Flag[j] = True if obs_time.tzinfo is None: obs_time = utc.localize(obs_time) ObsFirstTime.append(obs_time) else: # THIS WILL CREATE A DATETIME OBJECT WITH IN THE FORM .XX+00:00 if obsparameters[j].useGreytime: ObsFirstTime1 = NextWindowTools.NextObservationWindowGrey( time=obs_time, obspar=obsparameters[j] ) ObsFirstTime.append(ObsFirstTime1) if not obsparameters[j].useGreytime: ObsFirstTime1 = NextWindowTools.NextObservationWindow( time=obs_time, obspar=obsparameters[j] ) ObsFirstTime.append(ObsFirstTime1) if ObsFirstTime1: if ObsFirstTime1.tzinfo is None: ObsFirstTime1 = utc.localize(ObsFirstTime1) if obs_time.tzinfo is None: obs_time = utc.localize(obs_time) if ObsFirstTime1 < obs_time + datetime.timedelta(hours=24): FirstDark_Flag[j] = True j += 1 # Checking which observatories are available for observations and saving their start time ActiveObsStart = [] ActiveObs = [] SameNight = np.full(len(obsparameters), False, dtype=bool) j = 0 for obspar in obsparameters: if FirstDark_Flag[j]: if ObsFirstTime[j].tzinfo is None: ObsFirstTime = utc.localize(ObsFirstTime[j]) ActiveObsStart.append(ObsFirstTime[j]) ActiveObs.append(obsparameters[j]) SameNight[j] = True j += 1 # Sorting observatories according to their first obsevation time available NewActiveObsStart = np.sort(ActiveObsStart) NewActiveObs = ActiveObs ind = np.argsort(ActiveObsStart) ind = np.array(ind) NewActiveObs = np.take(ActiveObs, ind) return obs_time, SameNight, NewActiveObs, NewActiveObsStart
[docs] def PGWinFoV_NObs( skymap, nameEvent, ObservationTime0, PointingFile, obsparameters, dirName ): """ Compute an observation schedule for multiple telescopes/observatories based on a 2D method. This mid-level function is called by `GetSchedule` (see :func:`tilepy.include.ObservationScheduler.GetSchedule`) and produces a suggested schedule of pointings for each observatory, given the input sky map, pointings already performed, and observation parameters. Parameters ---------- skymap : SkyMap The object storing sky maps. nameEvent : str The name of the event. ObservationTime0 : str The desired start time for scheduling. PointingFile : str Path to the text file containing pointings already performed before scheduling. obsparameters : list of ObservationParameters Parameters for each observatory needed to launch the tiling scheduler. dirName : str Path to the output directory where the schedules and plots will be saved. Returns ------- SuggestedPointings : astropy.table.Table Table of suggested pointings for all observatories. obsparameters : list of ObservationParameters (Possibly updated) list of parameters for each observatory. """ obs_time, SameNight, NewActiveObs, NewActiveObsStart = ObservationStartperObs( obsparameters, ObservationTime0 ) # START ################################################################################################################################################# random.seed() RAarray = [] DECarray = [] pixlist = [] pixlistHR = [] pixlist1 = [] pixlistHR1 = [] P_GWarray = [] ObservationTimearray = [] Round = [] ObsName = [] Duration = [] Fov_obs = [] ################################################################################################################################################# obspar = obsparameters[0] # Retrieve maps prob = skymap.getMap("prob", obspar.reducedNside) highres = skymap.getMap("prob", obspar.HRnside) # Create table for 2D probability at 90% containment rapix, decpix, areapix = GetRegionPixReduced( prob, obspar.percentageMOC, obspar.reducedNside ) radecs = co.SkyCoord(rapix, decpix, frame="icrs", unit=(u.deg, u.deg)) maxRuns = obspar.maxRuns # Add observed pixels to pixlist if PointingFile is not None: print(PointingFile, prob, obspar.reducedNside, obspar.FOV, pixlist) pixlist, pixlistHR, sumPGW, doneObs = SubstractPointings2D( PointingFile, prob, obspar, pixlist, pixlistHR ) if obspar.countPrevious: maxRuns = obspar.maxRuns - doneObs logger.info( "===========================================================================================" ) logger.info(f"Total GW probability already covered: {sumPGW}") logger.info( f"Count Previous = {obspar.countPrevious}, Number of pointings already done: {doneObs}, " f"Max Runs was {obspar.maxRuns}, now is {maxRuns}" ) logger.info( "===========================================================================================" ) ################################################################################################################################################# ITERATION_OBS = 0 TIME_MIN_ALL = [] TIME_MIN = obs_time + datetime.timedelta(hours=12) if TIME_MIN.tzinfo is None: TIME_MIN = utc.localize(TIME_MIN) NewActiveObsTime = NewActiveObsStart NUMBER_OBS = np.zeros(len(NewActiveObs)) ################################################################################################################################################# counter = 0 i = 0 couter_per_obs = np.zeros(len(NewActiveObs)) logger.info(f"------NewActiveObsTime--------\n{NewActiveObs[0].obs_name}") while (i < 500) & any(SameNight): for j, obs in enumerate(NewActiveObs): obspar = NewActiveObs[j] logger.info(f"Observatory: {obspar.obs_name}") ObservationTime = NewActiveObsTime[j] if ITERATION_OBS == len(obsparameters): TIME_MIN_ALL = [] ITERATION_OBS = 0 ITERATION_OBS += 1 if obspar.base == "space": SatelliteName = GetSatelliteName(obspar.obs_name, obspar.stationsurl) if couter_per_obs[j] >= maxRuns: SameNight[j] = False if (TIME_MIN >= NewActiveObsTime[j]) & SameNight[j]: pixlistHROcc = None if obspar.base == "space": SatelliteTime = GetSatelliteTime(SatelliteName, ObservationTime) satellitePosition, satelliteLocation = GetSatellitePositions( SatelliteName, SatelliteTime ) ObsBool, yprob, pixlistHROcc = OccultationCut( prob, obspar.reducedNside, ObservationTime, obspar, satellitePosition, satelliteLocation, ) else: ObsBool, yprob = ZenithAngleCut(prob, ObservationTime, obspar) if ObsBool: # Round 1 P_GW, TC, pixlist, pixlistHR = ComputeProbability2D( obspar, prob, highres, radecs, ObservationTime, pixlist, pixlistHR, counter, dirName, pixlistHROcc, ) # print(P_GW, obspar.name) if (P_GW <= obspar.minProbcut) and obspar.secondRound: # Try Round 2 # print('The minimum probability cut being', minProbcut * 100, '% is, unfortunately, not reached.') yprob1 = highres P_GW, TC, pixlist1, pixlistHR1 = ComputeProbability2D( prob, yprob1, radecs, ObservationTime, pixlist1, pixlistHR1, counter, dirName, pixlistHROcc, ) if P_GW <= obspar.minProbcut: logger.info( "Tile Pgw= ", P_GW, " is smaller than the minProbCut (", obspar.minProbCut, ") => skip this tile", ) else: Round.append(2) P_GWarray.append(float("{:1.4f}".format(float(P_GW)))) RAarray.append(float("{:3.4f}".format(float(TC.ra.deg)))) DECarray.append(float("{:3.4f}".format(float(TC.dec.deg)))) ObservationTime = str(ObservationTime).split("+")[0] try: ObservationTime = datetime.datetime.strptime( ObservationTime, "%Y-%m-%d %H:%M:%S" ) except ValueError: ObservationTime = str(ObservationTime).split(".")[0] ObservationTime = datetime.datetime.strptime( ObservationTime, "%Y-%m-%d %H:%M:%S" ) ObservationTimearray.append(ObservationTime) ObsName.append(obspar.obs_name) counter = counter + 1 couter_per_obs[j] += 1 Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) elif P_GW >= obspar.minProbcut: Round.append(1) P_GWarray.append(float("{:1.4f}".format(float(P_GW)))) RAarray.append(float("{:3.4f}".format(float(TC.ra.deg)))) DECarray.append(float("{:3.4f}".format(float(TC.dec.deg)))) ObservationTime = str(ObservationTime).split("+")[0] try: ObservationTime = datetime.datetime.strptime( ObservationTime, "%Y-%m-%d %H:%M:%S" ) except ValueError: ObservationTime = str(ObservationTime).split(".")[0] ObservationTime = datetime.datetime.strptime( ObservationTime, "%Y-%m-%d %H:%M:%S" ) ObservationTimearray.append(ObservationTime) ObsName.append(obspar.obs_name) counter = counter + 1 couter_per_obs[j] += 1 Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) # HERE WE DETERMINE THE OBSERVATION DURATION ... FOR NOW WE USE 30 MINS FOR ALL NewActiveObsTime[j] = NewActiveObsTime[j] + datetime.timedelta( minutes=obspar.duration ) # HERE WE DETERMINE IF WE ARE STILL IN THE SAME NIGHT FOR THIS OBSERVATORY # if (NewActiveObsTime[j] > Tools.NextSunrise(obsstart, obspar)) | (obsstart > Tools.NextMoonrise(obsstart, obspar)): if obspar.base == "space": if NewActiveObsTime[j] > obs_time + datetime.timedelta(hours=24): SameNight[j] = False else: if not obsparameters[j].useGreytime: if not Tools.CheckWindow(NewActiveObsTime[j], obspar): SameNight[j] = False if obsparameters[j].useGreytime: if not Tools.CheckWindowGrey(NewActiveObsTime[j], obspar): SameNight[j] = False if obspar.sunDown > 10: if NewActiveObsTime[j] > obs_time + datetime.timedelta( hours=24 ): SameNight[j] = False NUMBER_OBS[j] += 1 if SameNight[j]: TIME_MIN = NewActiveObsTime[j] TIME_MIN_ALL.append(TIME_MIN) TIME_MIN = np.min(TIME_MIN_ALL) else: TIME_MIN = TIME_MIN + datetime.timedelta(hours=12) i += 1 SuggestedPointings = Table( [ ObservationTimearray, RAarray, DECarray, P_GWarray, Round, ObsName, Duration, Fov_obs, ], names=[ "Time[UTC]", "RA[deg]", "DEC[deg]", "PGW", "Round", "ObsName", "Duration", "FoV", ], ) logger.info("The total probability PGW: ", np.sum(P_GWarray)) return SuggestedPointings, obsparameters
[docs] def PGalinFoV_NObs( skymap, nameEvent, ObservationTime0, PointingFile, galFile, obsparameters, dirName ): """ Compute the optimal observation schedule based on galaxy probability and gravitational wave data for multiple observatories. """ obs_time, SameNight, NewActiveObs, NewActiveObsStart = ObservationStartperObs( obsparameters, ObservationTime0 ) # START ################################################################################################################################################# random.seed() P_GALarray = [] P_GWarray = [] ObservationTimearray = [] RAarray = [] DECarray = [] alreadysumipixarray1 = [] alreadysumipixarray2 = [] Round = [] ObsName = [] Duration = [] Fov_obs = [] ################################################################################################################################################# obspar = obsparameters[0] nside = obspar.HRnside prob = skymap.getMap("prob", obspar.HRnside) # load galaxy catalogue if not obspar.mangrove: cat = LoadGalaxies(galFile) else: cat = LoadGalaxies_SteMgal(galFile) # correlate GW map with galaxy catalog, retrieve ordered list if not obspar.mangrove: cat = skymap.computeGalaxyProbability(cat) tGals0 = FilterGalaxies(cat, obspar.minimumProbCutForCatalogue) sum_dP_dV = cat["dp_dV"].sum() else: cat = skymap.computeGalaxyProbability(cat) tGals0 = FilterGalaxies(cat, obspar.minimumProbCutForCatalogue) tGals0 = MangroveGalaxiesProbabilities(tGals0) sum_dP_dV = cat["dp_dV"].sum() # Add observed pixels to pixlist maxRuns = obspar.maxRuns if PointingFile is None: tGals = tGals0 logger.info("No pointings were given to be subtracted") else: # tGals_aux = tGals ( ra, dec, tGals, AlreadyObservedPgw, AlreadyObservedPgal, alreadysumipixarray1, doneObs, ) = SubstractPointings( PointingFile, tGals0, alreadysumipixarray1, sum_dP_dV, prob, obspar, nside ) maxRuns = obspar.maxRuns - len(ra) sumPGW = sum(AlreadyObservedPgw) sumPGAL = sum(AlreadyObservedPgal) if obspar.countPrevious: maxRuns = obspar.maxRuns - doneObs logger.info( "===========================================================================================" ) logger.info( f"Total GW probability already covered: {sumPGW}, " f"Total Gal probability already covered: {sumPGAL}" ) logger.info( f"Count Previous = {obspar.countPrevious}, Number of pointings already done: {doneObs}, " f"Max Runs was {obspar.maxRuns}, now is {maxRuns}" ) logger.info( "===========================================================================================" ) tGals_aux = tGals tGals_aux2 = tGals ################################################################################################################################################# ITERATION_OBS = 0 TIME_MIN_ALL = [] TIME_MIN = obs_time + datetime.timedelta(hours=12) if TIME_MIN.tzinfo is None: TIME_MIN = utc.localize(TIME_MIN) NewActiveObsTime = NewActiveObsStart NUMBER_OBS = np.zeros(len(NewActiveObs)) ################################################################################################################################################# counter = 0 i = 0 couter_per_obs = np.zeros(len(NewActiveObs)) while (i < 5000) & any(SameNight): for j, obs in enumerate(NewActiveObs): obspar = NewActiveObs[j] ObservationTime = NewActiveObsTime[j] if ITERATION_OBS == len(obsparameters): TIME_MIN_ALL = [] ITERATION_OBS = 0 ITERATION_OBS += 1 if couter_per_obs[j] >= maxRuns: SameNight[j] = False if (TIME_MIN >= NewActiveObsTime[j]) & SameNight[j]: if obspar.strategy == "integrated": visible, altaz, tGals_aux = VisibleAtTime( ObservationTime, tGals_aux, obspar ) if visible: # select galaxies within the slightly enlarged visibility window visiMask = altaz.alt.value > 90 - ( obspar.maxZenith + obspar.FOV ) visiGals = tGals_aux[visiMask] visiGals = ModifyCatalogue( prob, visiGals, obspar.FOV, sum_dP_dV, nside ) mask, minz = FulfillsRequirement(visiGals, obspar, UsePix=False) if obspar.useGreytime: maskgrey = FulfillsRequirementGreyObservations( ObservationTime, visiGals, obspar ) finalGals = visiGals[mask & maskgrey] if not obspar.useGreytime: finalGals = visiGals[mask] if finalGals["dp_dV_FOV"][0] > obspar.minProbcut: # final galaxies within the FoV # print(f"Condition met at {ObservationTime}: dp/dV_FOV = {finalGals['dp_dV_FOV'][0]} is greater than {obspar.minProbcut}") if ( (finalGals["dp_dV_FOV"][0] < (2 * obspar.minProbcut)) and (sum(P_GWarray) > 0.40) and obspar.secondRound ): logger.info( f"Probability: {finalGals['dp_dV_FOV'][:1]}" ) visible, altaz, tGals_aux2 = VisibleAtTime( ObservationTime, tGals_aux2, obspar ) if visible: visiMask = altaz.alt.value > 90 - ( obspar.maxZenith + obspar.FOV ) visiGals2 = tGals_aux2[visiMask] visiGals2 = ModifyCatalogue( prob, visiGals2, obspar.FOV, sum_dP_dV, nside ) mask, minz = FulfillsRequirement( visiGals2, obspar, UsePix=False ) if obspar.useGreytime: maskgrey = FulfillsRequirementGreyObservations( ObservationTime, visiGals2, obspar ) finalGals2 = visiGals2[mask & maskgrey] if not obspar.useGreytime: finalGals2 = visiGals2[mask] p_gal, p_gw, tGals_aux2, alreadysumipixarray2 = ( ComputeProbPGALIntegrateFoV( prob, ObservationTime, obspar.location, finalGals2, False, visiGals2, tGals_aux2, sum_dP_dV, alreadysumipixarray2, nside, minz, obspar, counter, nameEvent, dirName, obspar.doPlot, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals2["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals2["DEJ2000"][:1]) ) ) ) Round.append(2) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTime = str(ObservationTime).split(".")[0] try: ObservationTime = datetime.datetime.strptime( ObservationTime, "%Y-%m-%d %H:%M:%S" ) except Exception: ObservationTime = str(ObservationTime).split( "+" )[0] ObservationTime = datetime.datetime.strptime( ObservationTime, "%Y-%m-%d %H:%M:%S" ) ObservationTimearray.append(ObservationTime) ObsName.append(obspar.obs_name) counter = counter + 1 couter_per_obs[j] += 1 Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) else: # print("\n=================================") # print("TARGET COORDINATES AND DETAILS...") # print("=================================") # print(finalGals['RAJ2000', 'DEJ2000', 'Bmag', 'Dist', 'Alt', 'dp_dV','dp_dV_FOV'][:1]) p_gal, p_gw, tGals_aux, alreadysumipixarray1 = ( ComputeProbPGALIntegrateFoV( prob, ObservationTime, obspar.location, finalGals, False, visiGals, tGals_aux, sum_dP_dV, alreadysumipixarray1, nside, minz, obspar, counter, nameEvent, dirName, obspar.doPlot, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals["DEJ2000"][:1]) ) ) ) Round.append(1) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTime = str(ObservationTime).split(".")[0] try: ObservationTime = datetime.datetime.strptime( ObservationTime, "%Y-%m-%d %H:%M:%S" ) except Exception: ObservationTime = str(ObservationTime).split("+")[0] ObservationTime = datetime.datetime.strptime( ObservationTime, "%Y-%m-%d %H:%M:%S" ) ObservationTimearray.append(ObservationTime) ObsName.append(obspar.obs_name) counter = counter + 1 couter_per_obs[j] += 1 Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) # ObservationTimearrayNamibia.append(Tools.UTCtoNamibia(ObservationTime)) else: logger.info( f"Condition NOT met at {ObservationTime}: dp/dV_FOV = {finalGals['dp_dV_FOV'][0]} is greater than {obspar.minProbcut}" ) if obspar.strategy == "targeted": visible, altaz, tGals_aux = VisibleAtTime( ObservationTime, tGals_aux, obspar ) if visible: # select galaxies within the slightly enlarged visibility window visiMask = altaz.alt.value > 90 - ( obspar.maxZenith + obspar.FOV ) visiGals = tGals_aux[visiMask] mask, minz = FulfillsRequirement(visiGals, obspar, UsePix=False) if obspar.useGreytime: maskgrey = FulfillsRequirementGreyObservations( ObservationTime, visiGals, obspar.location, obspar.minMoonSourceSeparation, ) finalGals = visiGals[mask & maskgrey] if not obspar.useGreytime: finalGals = visiGals[mask] # print('finalGals', finalGals,tGals['dp_dV'][:1]*obspar.minProbcut) if ( finalGals["dp_dV"][0] > tGals["dp_dV"][:1] * obspar.minProbcut ): logger.info( f"Condition met at {ObservationTime}: dp/dV = {finalGals['dp_dV'][0]} is greater than {obspar.minProbcut}" ) if ( (finalGals["dp_dV"][0] < (2 * obspar.minProbcut)) and (sum(P_GWarray) > 0.40) and obspar.secondRound ): visible, altaz, tGals_aux2 = VisibleAtTime( ObservationTime, tGals_aux2, obspar ) if visible: visiMask = altaz.alt.value > 90 - ( obspar.maxZenith + obspar.FOV ) visiGals2 = tGals_aux2[visiMask] mask, minz = FulfillsRequirement( visiGals2, obspar, UsePix=False ) if obspar.useGreytime: maskgrey = FulfillsRequirementGreyObservations( ObservationTime, visiGals2, obspar ) finalGals2 = visiGals2[mask & maskgrey] if not obspar.useGreytime: finalGals2 = visiGals2[mask] p_gal, p_gw, tGals_aux2, alreadysumipixarray2 = ( ComputeProbGalTargeted( prob, ObservationTime, finalGals2, visiGals2, tGals_aux2, sum_dP_dV, alreadysumipixarray2, nside, minz, obspar, counter, dirName, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals2["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals2["DEJ2000"][:1]) ) ) ) Round.append(2) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) counter = counter + 1 couter_per_obs[j] += 1 Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) else: p_gal, p_gw, tGals_aux, alreadysumipixarray1 = ( ComputeProbGalTargeted( prob, ObservationTime, finalGals, visiGals, tGals_aux, sum_dP_dV, alreadysumipixarray1, nside, minz, obspar, counter, dirName, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals["DEJ2000"][:1]) ) ) ) Round.append(1) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) counter = counter + 1 couter_per_obs[j] += 1 Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) else: # print("We are in round 1") p_gal, p_gw, tGals_aux, alreadysumipixarray1 = ( ComputeProbGalTargeted( prob, ObservationTime, finalGals, visiGals, tGals_aux, sum_dP_dV, alreadysumipixarray1, nside, minz, obspar, counter, dirName, ) ) RAarray.append( float( "{:3.4f}".format( float(finalGals["RAJ2000"][:1]) ) ) ) DECarray.append( float( "{:3.4f}".format( float(finalGals["DEJ2000"][:1]) ) ) ) Round.append(1) P_GALarray.append(float("{:1.4f}".format(p_gal))) P_GWarray.append(float("{:1.4f}".format(p_gw))) ObservationTimearray.append( ObservationTime.strftime("%Y-%m-%d %H:%M:%S") ) ObsName.append(obspar.obs_name) counter = counter + 1 couter_per_obs[j] += 1 Duration.append(obspar.duration) Fov_obs.append(obspar.FOV) else: logger.info( f"Condition NOT met at {ObservationTime}: dp/dV = {finalGals['dp_dV'][0]} is greater than {obspar.minProbcut}" ) # HERE WE DETERMINE THE OBSERVATION DURATION ... FOR NOW WE USE 30 MINS FOR ALL NewActiveObsTime[j] = NewActiveObsTime[j] + datetime.timedelta( minutes=obspar.duration ) # HERE WE DETERMINE IF WE ARE STILL IN THE SAME NIGHT FOR THIS OBSERVATORY # if (NewActiveObsTime[j] > Tools.NextSunrise(NewActiveObsStart[j], NewActiveObs[j])) | (NewActiveObsStart[j] > Tools.NextMoonrise(obs_time, NewActiveObs[j])): if not obsparameters[j].useGreytime: if not Tools.CheckWindow(NewActiveObsTime[j], obspar): SameNight[j] = False if obsparameters[j].useGreytime: if not Tools.CheckWindowGrey(NewActiveObsTime[j], obspar): SameNight[j] = False if obspar.sunDown > 10: if NewActiveObsTime[j] > obs_time + datetime.timedelta(hours=24): SameNight[j] = False NUMBER_OBS[j] += 1 if SameNight[j]: TIME_MIN = NewActiveObsTime[j] TIME_MIN_ALL.append(TIME_MIN) TIME_MIN = np.min(TIME_MIN_ALL) else: TIME_MIN = TIME_MIN + datetime.timedelta(hours=12) i += 1 SuggestedPointings = Table( [ ObservationTimearray, RAarray, DECarray, P_GWarray, P_GALarray, Round, ObsName, Duration, Fov_obs, ], names=[ "Time[UTC]", "RA[deg]", "DEC[deg]", "PGW", "Pgal", "Round", "ObsName", "Duration", "FoV", ], ) logger.info(f"The total probability PGal: {np.sum(P_GALarray)}") logger.info(f"The total probability PGW: {np.sum(P_GWarray)}") return SuggestedPointings, tGals0, obsparameters
def GetBestTiles2D(skymap, nameEvent, PointingFile, obsparameters, dirName): """ Compute the best observation tiles based on a 2D method, considering galaxy probabilities and observatory constraints. """ random.seed() RAarray = [] DECarray = [] P_GWarray = [] Occultedpixels = [] pixlistHR = [] obspar = obsparameters[0] radius = obspar.FOV HRnside, reducedNside = GetBestNSIDE(obspar.reducedNside, obspar.HRnside, radius) # Retrieve maps prob = skymap.getMap("prob", reducedNside) highres = skymap.getMap("prob", HRnside) # Create table for 2D probability at 90% containment rapix, decpix, _ = GetRegionPixReduced(prob, obspar.percentageMOC, reducedNside) radecs = co.SkyCoord(rapix, decpix, frame="icrs", unit=(u.deg, u.deg)) maxRuns = obspar.maxRuns doPlot = obspar.doPlot # Add observed pixels to pixlist if PointingFile is not None: # FIXME: pixlist is undefined in this scope # The program will crash is the if branch is executed print( PointingFile, prob, obspar.reducedNside, obspar.FOV, pixlist, # noqa: F821 ) pixlist, pixlistHR, sumPGW, doneObs = SubstractPointings2D( PointingFile, prob, obspar, pixlist, # noqa: F821 pixlistHR, # noqa: F821 ) if obspar.countPrevious: maxRuns = obspar.maxRuns - doneObs logger.info( "===========================================================================================" ) logger.info(f"Total GW probability already covered: {sumPGW}") logger.info( f"Count Previous = {obspar.countPrevious}, Number of pointings already done: {doneObs}, " f"Max Runs was {obspar.maxRuns}, now is {maxRuns}" ) logger.info( "===========================================================================================" ) ipix = TransformRADecToPix(radecs, reducedNside) newpix = ipix first_values = GetBestGridPos2D( prob, highres, HRnside, reducedNside, newpix, radius, maxRuns, Occultedpixels, doPlot, dirName, obspar.numberSides, pixlistHR, obspar.minProbcut, ) # ObsName = [obspar.name for j in range(len(first_values))] RAarray = [row["PIXRA"] for row in first_values] DECarray = [row["PIXDEC"] for row in first_values] P_GWarray = [row["PIXFOVPROB"] for row in first_values] SuggestedPointings = Table( [RAarray, DECarray, P_GWarray], names=["RA[deg]", "DEC[deg]", "PGW"] ) return SuggestedPointings def GetBestTiles3D(skymap, nameEvent, PointingFile, galFile, obsparameters, dirName): """ Compute the best observation tiles based on a 3D method for space-based observatories, considering galaxy probabilities and satellite constraints. """ random.seed() RAarray = [] DECarray = [] pixlist = [] pixlistHR = [] ObsName = [] Occultedpixels = [] obspar = obsparameters[0] radius = obspar.FOV HRnside, reducedNside = GetBestNSIDE(obspar.reducedNside, obspar.HRnside, radius) maxRuns = obspar.maxRuns # Retrieve maps prob = skymap.getMap("prob", reducedNside) # Create table for 2D probability at 90% containment rapix, decpix, areapix = GetRegionPixReduced( prob, obspar.percentageMOC, reducedNside ) radecs = co.SkyCoord(rapix, decpix, frame="icrs", unit=(u.deg, u.deg)) maxRuns = obspar.maxRuns doPlot = obspar.doPlot # load galaxy catalogue if not obspar.mangrove: cat = LoadGalaxies(galFile) else: cat = LoadGalaxies_SteMgal(galFile) # correlate GW map with galaxy catalog, retrieve ordered list if not obspar.mangrove: cat = skymap.computeGalaxyProbability(cat) tGals0 = FilterGalaxies(cat, obspar.minimumProbCutForCatalogue) sum_dP_dV = cat["dp_dV"].sum() else: cat = skymap.computeGalaxyProbability(cat) tGals0 = FilterGalaxies(cat, obspar.minimumProbCutForCatalogue) tGals0 = MangroveGalaxiesProbabilities(tGals0) sum_dP_dV = cat["dp_dV"].sum() # Add observed pixels to pixlist if PointingFile is not None: print(PointingFile, prob, obspar.reducedNside, obspar.FOV, pixlist) pixlist, pixlistHR, sumPGW, doneObs = SubstractPointings2D( PointingFile, prob, obspar, pixlist, pixlistHR ) if obspar.countPrevious: maxRuns = obspar.maxRuns - doneObs logger.info( "===========================================================================================" ) logger.info(f"Total GW probability already covered: {sumPGW}") logger.info( f"Count Previous = {obspar.countPrevious}, Number of pointings already done: {doneObs}, " f"Max Runs was {obspar.maxRuns}, now is {maxRuns}" ) logger.info( "===========================================================================================" ) ipix = TransformRADecToPix(radecs, reducedNside) newpix = ipix # CONVERTING newpix to angles on the coordinate grid pixradec = TransformPixToRaDec(newpix, reducedNside) first_values = GetBestGridPos3D( prob, tGals0, pixradec, newpix, radius, sum_dP_dV, HRnside, obspar.numberSides, maxRuns, doPlot, dirName, reducedNside, Occultedpixels, obspar.minProbcut, ) ObsName = [obspar.obs_name for j in range(len(first_values))] RAarray = [row["PIXRA"] for row in first_values] DECarray = [row["PIXDEC"] for row in first_values] P_Galarray = [row["PIXFOVPROB"] for row in first_values] SuggestedPointings = Table( [ObsName, RAarray, DECarray, P_Galarray], names=["ObsName", "RA[deg]", "DEC[deg]", "PGal"], ) return SuggestedPointings
[docs] def PGWinFoV_Space_NObs( skymap, nameEvent, ObservationTime0, PointingFile, obsparameters, dirName ): """ Compute an observation schedule for space-based observatories using a 3D method. It calculates optimal observation times considering the galaxy catalog, GW probability map, and satellite constraints. The function returns suggested pointings along with satellite visibility times and SAA status. """ random.seed() RAarray = [] DECarray = [] pixlist = [] pixlistHR = [] P_GWarray = [] ObsName = [] Occultedpixels = [] obspar = obsparameters[0] radius = obspar.FOV HRnside, reducedNside = GetBestNSIDE(obspar.reducedNside, obspar.HRnside, radius) ################################################################################################################################################# # Retrieve maps prob = skymap.getMap("prob", reducedNside) highres = skymap.getMap("prob", HRnside) # Create table for 2D probability at 90% containment rapix, decpix, areapix = GetRegionPixReduced( prob, obspar.percentageMOC, reducedNside ) radecs = co.SkyCoord(rapix, decpix, frame="icrs", unit=(u.deg, u.deg)) maxRuns = obspar.maxRuns doPlot = obspar.doPlot # Add observed pixels to pixlist if PointingFile is not None: print(PointingFile, prob, obspar.reducedNside, obspar.FOV, pixlist) pixlist, pixlistHR, sumPGW, doneObs = SubstractPointings2D( PointingFile, prob, obspar, pixlist, pixlistHR ) if obspar.countPrevious: maxRuns = obspar.maxRuns - doneObs logger.info( "===========================================================================================" ) logger.info(f"Total GW probability already covered: {sumPGW}") logger.info( f"Count Previous = {obspar.countPrevious}, Number of pointings already done: {doneObs}, " f"Max Runs was {obspar.maxRuns}, now is {maxRuns}" ) logger.info( "===========================================================================================" ) ipix = TransformRADecToPix(radecs, reducedNside) newpix = ipix first_values1 = GetBestGridPos2D( prob, highres, HRnside, reducedNside, newpix, radius, maxRuns, Occultedpixels, doPlot, dirName, obspar.numberSides, pixlistHR, obspar.minProbcut, ) # FOR SPACE ######################################################## # Computing the satellite name SatelliteName = GetSatelliteName(obspar.obs_name, obspar.stationsurl) saa = np.empty(obspar.maxRuns + 1, dtype=bool) SatTimes = np.empty(obspar.maxRuns + 1, dtype=bool) # Iterate through time steps within the specified duration -> to be modified current_time = ObservationTime0 start_time = ObservationTime0 step = int(obspar.duration / obspar.maxRuns) step = datetime.timedelta(minutes=step) duration = obspar.duration SatTimes, saa = SAA_Times( duration, start_time, current_time, SatelliteName, saa, SatTimes, step, doPlot, dirName, obspar.datasetDir, obspar.SAAThreshold, ) i = 0 current_time = ObservationTime0 start_time = ObservationTime0 AvailablePixPerTime = [] TestTime = [] RadecsVsTimes = [] matching_tables = [] ProbaTime = [] while current_time <= start_time + datetime.timedelta(minutes=duration): # Need to get a list of highest pixels SatelliteTime = GetSatelliteTime(SatelliteName, current_time) satellitePosition, satelliteLocation = GetSatellitePositions( SatelliteName, SatelliteTime ) ObsBool, yprob, pixlistRROcc = OccultationCut( prob, reducedNside, current_time, obspar, satellitePosition, satelliteLocation, ) # Let's get the list of pixels available at each iteration firstvalue1 = first_values1 matching_rows1 = FindMatchingCoords(1, firstvalue1, pixlistRROcc, reducedNside) matching_tables.append(matching_rows1) radectime = co.SkyCoord( ra=matching_rows1["PIXRA"] * u.deg, dec=matching_rows1["PIXDEC"] * u.deg ) pix_idx = TransformRADecToPix(radectime, reducedNside) pix_proba = matching_rows1["PIXFOVPROB"] RadecsVsTimes.append(radectime) AvailablePixPerTime.append(pix_idx) TestTime.append(current_time) ProbaTime.append(pix_proba) # List of all cculted pixels Occultedpixels.append(pixlistRROcc) current_time += step i += 1 # WE CAN GET THE LIST OF PIXELS AVAILABLE AT ALL TIMES Occultedpixels = [item for sublist in Occultedpixels for item in sublist] OldPix = ipix searchpix = np.isin(OldPix, Occultedpixels, invert=True) newpix = OldPix[searchpix] # Find common pixels first_values = FindMatchingPixList(newpix, first_values1) ObsName = [obspar.obs_name for j in range(len(first_values))] RAarray = [row["PIXRA"] for row in first_values] DECarray = [row["PIXDEC"] for row in first_values] P_GWarray = [row["PIXFOVPROB"] for row in first_values] SuggestedPointings = Table( [ObsName, RAarray, DECarray, P_GWarray], names=["ObsName", "RA[deg]", "DEC[deg]", "PGW"], ) result = { "SatTimes": SatTimes, "saa": saa, "first_values1": first_values1, "first_values": first_values, "TestTime": TestTime, "AvailablePixPerTime": AvailablePixPerTime, "ProbaTime": ProbaTime, "Occultedpixels": Occultedpixels, } return SuggestedPointings, result
def PGalinFoV_Space_NObs( skymap, nameEvent, ObservationTime0, PointingFile, galFile, obsparameters, dirName ): """ Compute an observation schedule for space-based observatories using a 3D method. Called by :func:`tilepy.include.observationschedule.GetSchedule`, this function generates a schedule of pointings for each observatory, considering the field of view (FoV), galaxy catalog, and satellite position with occultation constraints. Parameters ---------- skymap : SkyMap The object storing sky maps. nameEvent : str The name of the event. ObservationTime0 : datetime The desired start time for scheduling to begin. PointingFile : str, optional Path to the text file containing the pointings that have already been performed. galFile : str Path to the galaxy catalog file. obsparameters : list of ObservationParameters A list of sets of parameters for each observatory needed to launch the tiling scheduler. dirName : str Path to the output directory where the schedules and plots will be saved. Returns ------- SuggestedPointings : astropy.table.Table Table of suggested pointings with their RA, DEC, and galaxy probability. SatTimes : numpy.ndarray Array of satellite observation times. saa : numpy.ndarray Array indicating the satellite's South Atlantic Anomaly (SAA) status at each time step. """ random.seed() RAarray = [] DECarray = [] pixlist = [] pixlistHR = [] ObsName = [] Occultedpixels = [] obspar = obsparameters[0] radius = obspar.FOV HRnside, reducedNside = GetBestNSIDE(obspar.reducedNside, obspar.HRnside, radius) ################################################################################################################################################# # Retrieve maps prob = skymap.getMap("prob", reducedNside) # Create table for 2D probability at 90% containment rapix, decpix, areapix = GetRegionPixReduced( prob, obspar.percentageMOC, reducedNside ) radecs = co.SkyCoord(rapix, decpix, frame="icrs", unit=(u.deg, u.deg)) maxRuns = obspar.maxRuns doPlot = obspar.doPlot # load galaxy catalogue if not obspar.mangrove: cat = LoadGalaxies(galFile) else: cat = LoadGalaxies_SteMgal(galFile) # correlate GW map with galaxy catalog, retrieve ordered list if not obspar.mangrove: cat = skymap.computeGalaxyProbability(cat) tGals0 = FilterGalaxies(cat, obspar.minimumProbCutForCatalogue) sum_dP_dV = cat["dp_dV"].sum() else: cat = skymap.computeGalaxyProbability(cat) tGals0 = FilterGalaxies(cat, obspar.minimumProbCutForCatalogue) tGals0 = MangroveGalaxiesProbabilities(tGals0) sum_dP_dV = cat["dp_dV"].sum() # Add observed pixels to pixlist if PointingFile is not None: print(PointingFile, prob, reducedNside, radius, pixlist) pixlist, pixlistHR, sumPGW, doneObs = SubstractPointings2D( PointingFile, prob, obspar, pixlist, pixlistHR ) if obspar.countPrevious: maxRuns = obspar.maxRuns - doneObs logger.info( "===========================================================================================" ) logger.info(f"Total GW probability already covered: {sumPGW}") logger.info( f"Count Previous = {obspar.countPrevious}, Number of pointings already done: {doneObs}, " f"Max Runs was {obspar.maxRuns}, now is {maxRuns}" ) logger.info( "===========================================================================================" ) ipix = TransformRADecToPix(radecs, reducedNside) newpix = ipix pixradec = radecs first_values1 = GetBestGridPos3D( prob, tGals0, pixradec, newpix, radius, sum_dP_dV, HRnside, obspar.numberSides, maxRuns, doPlot, dirName, reducedNside, Occultedpixels, obspar.minProbcut, ) # FOR SPACE ###################################################### # Computing the satellite name SatelliteName = GetSatelliteName(obspar.obs_name, obspar.stationsurl) saa = np.empty(obspar.maxRuns + 1, dtype=bool) SatTimes = np.empty(obspar.maxRuns + 1, dtype=bool) # Iterate through time steps within the specified duration current_time = ObservationTime0 start_time = ObservationTime0 step = int(obspar.duration / obspar.maxRuns) step = datetime.timedelta(minutes=step) duration = obspar.duration SatTimes, saa = SAA_Times( duration, start_time, current_time, SatelliteName, saa, SatTimes, step, doPlot, dirName, obspar.datasetDir, obspar.SAAThreshold, ) i = 0 current_time = ObservationTime0 start_time = ObservationTime0 AvailablePixPerTime = [] TestTime = [] RadecsVsTimes = [] matching_tables = [] ProbaTime = [] while current_time <= start_time + datetime.timedelta(minutes=duration): # Need to get a list of highest pixels SatelliteTime = GetSatelliteTime(SatelliteName, current_time) satellitePosition, satelliteLocation = GetSatellitePositions( SatelliteName, SatelliteTime ) ObsBool, yprob, pixlistRROcc = OccultationCut( prob, reducedNside, current_time, obspar, satellitePosition, satelliteLocation, ) # Let's get the list of pixels available at each iteration firstvalue1 = first_values1 matching_rows1 = FindMatchingCoords(1, firstvalue1, pixlistRROcc, reducedNside) matching_tables.append(matching_rows1) radectime = co.SkyCoord( ra=matching_rows1["PIXRA"] * u.deg, dec=matching_rows1["PIXDEC"] * u.deg ) theta = np.radians(90.0 - matching_rows1["PIXDEC"]) phi = np.radians(matching_rows1["PIXRA"]) # phi = longitude pix_idx = hp.ang2pix(reducedNside, theta, phi, nest=False) pix_proba = matching_rows1["PIXFOVPROB"] RadecsVsTimes.append(radectime) AvailablePixPerTime.append(pix_idx) TestTime.append(current_time) ProbaTime.append(pix_proba) # List of all cculted pixels Occultedpixels.append(pixlistRROcc) current_time += step i += 1 # WE CAN GET THE LIST OF PIXELS AVAILABLE AT ALL TIMES --> here we are getting them for all the 90% region... we can only get then for first value if we want Occultedpixels = [item for sublist in Occultedpixels for item in sublist] OldPix = ipix searchpix = np.isin(OldPix, Occultedpixels, invert=True) newpix = OldPix[searchpix] # CONVERTING newpix to angles on the coordinate grid pixradec = TransformPixToRaDec(newpix, reducedNside) # Finding the common radec between visible pixels and the grid first_values_coords = co.SkyCoord( ra=first_values1["PIXRA"], dec=first_values1["PIXDEC"], unit="deg" ) matching_rows = [] for coord in pixradec: sep = first_values_coords.separation(coord) matches = np.where(sep < 1e-2 * u.deg)[0] # adjust tolerance as needed matching_rows.extend(first_values1[matches]) if matching_rows: first_values = Table(rows=matching_rows, names=first_values1.colnames) else: logger.info("No coordinates matched within the tolerance.") first_values = Table(names=first_values1.colnames) # FOR TARGETED HERE TRY TO FIND OUT WHICH GALAXIES ARE IN THE VISIBLE PART. Then choose the highest 10 between nthem ObsName = [obspar.obs_name for j in range(len(first_values))] RAarray = [row["PIXRA"] for row in first_values] DECarray = [row["PIXDEC"] for row in first_values] P_Galarray = [row["PIXFOVPROB"] for row in first_values] SuggestedPointings = Table( [ObsName, RAarray, DECarray, P_Galarray], names=["ObsName", "RA[deg]", "DEC[deg]", "PGal"], ) result = { "SatTimes": SatTimes, "saa": saa, "first_values1": first_values1, "first_values": first_values, "TestTime": TestTime, "AvailablePixPerTime": AvailablePixPerTime, "ProbaTime": ProbaTime, "Occultedpixels": Occultedpixels, } return SuggestedPointings, result