Source code for frb.surveys.euclid

"""
Methods related to Euclid survey queries via astroquery.esa.euclid

Provides access to Euclid Q1 data release through the ESA Euclid Archive,
including photometry (VIS+NIR), morphology, and optional spectroscopy.
"""

import warnings
import signal
import os
import shutil
from contextlib import contextmanager
import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.table import Table

try:
    from astroquery.esa.euclid import Euclid
    Euclid.ROW_LIMIT = -1  # Remove row limit for queries
except ImportError:
    print("Warning: You need to install astroquery to use the Euclid survey tools...")

from frb.surveys import surveycoord
from frb.surveys import catalog_utils


@contextmanager
def _query_timeout(timeout_seconds):
    """Context manager to enforce a hard timeout on blocking archive queries."""
    if timeout_seconds is None or timeout_seconds <= 0:
        yield
        return

    def _handler(signum, frame):
        raise TimeoutError(f"Euclid query timed out after {timeout_seconds} seconds")

    previous_handler = signal.getsignal(signal.SIGALRM)
    signal.signal(signal.SIGALRM, _handler)
    signal.setitimer(signal.ITIMER_REAL, float(timeout_seconds))
    try:
        yield
    finally:
        signal.setitimer(signal.ITIMER_REAL, 0)
        signal.signal(signal.SIGALRM, previous_handler)

# Define the data model for Euclid photometry
# Main source: catalogue.mer_catalogue (MER = Multi-band Extraction Reduction)
photom = {}
photom['Euclid'] = {}
Euclid_bands = ['VIS', 'J', 'H', 'Y']
# VIS is detection band, J/H/Y are NISP IR bands
# Map FRB standard filter names to Euclid database columns
photom['Euclid']['Euclid_VIS'] = 'flux_vis_sersic'
photom['Euclid']['Euclid_VIS_err'] = 'fluxerr_vis_sersic'
photom['Euclid']['Euclid_J'] = 'flux_j_sersic'
photom['Euclid']['Euclid_J_err'] = 'fluxerr_j_sersic'
photom['Euclid']['Euclid_H'] = 'flux_h_sersic'
photom['Euclid']['Euclid_H_err'] = 'fluxerr_h_sersic'
photom['Euclid']['Euclid_Y'] = 'flux_y_sersic'
photom['Euclid']['Euclid_Y_err'] = 'fluxerr_y_sersic'
photom['Euclid']['Euclid_ID'] = 'object_id'
photom['Euclid']['ra'] = 'right_ascension'
photom['Euclid']['dec'] = 'declination'

# Optional morphological parameters
photom['Euclid']['Euclid_ellipticity'] = 'ellipticity'
photom['Euclid']['Euclid_kron_radius'] = 'kron_radius'
photom['Euclid']['Euclid_segmentation_area'] = 'segmentation_area'

# Define the data model for Euclid spectroscopy (if available)
spectrom = {}
spectrom['Euclid'] = {}
spectrom['Euclid']['Euclid_spec_ID'] = 'source_id'
spectrom['Euclid']['Euclid_spec_z'] = 'redshift'
spectrom['Euclid']['ra'] = 'right_ascension'
spectrom['Euclid']['dec'] = 'declination'


[docs] class Euclid_Survey(surveycoord.SurveyCoord): """ Class to handle queries on the Euclid survey database. Queries the ESA Euclid Archive for photometry and optional spectroscopy from the Q1 (March 2025) public data release. Primary data source is the mer_catalogue table which contains VIS+NIR photometry and morphology. Args: coord (SkyCoord): Coordinate for surveying around radius (Angle): Search radius around the coordinate verbose (bool, optional): Verbosity flag Examples: >>> from astropy.coordinates import SkyCoord >>> from astropy import units as u >>> from frb.surveys.euclid import Euclid_Survey >>> coord = SkyCoord(ra=267.78*u.deg, dec=65.53*u.deg) >>> radius = 10*u.arcmin >>> survey = Euclid_Survey(coord, radius) >>> catalog = survey.get_catalog() """
[docs] def __init__(self, coord, radius, **kwargs): surveycoord.SurveyCoord.__init__(self, coord, radius, **kwargs) self.survey = 'Euclid'
[docs] def get_catalog(self, query_fields=None, timeout=120, check_spectra=False): """ Query Euclid for all objects within a given radius of input coordinates. Queries the mer_catalogue table for photometry and morphology. Optionally searches for associated spectroscopy if photometry is found. Args: query_fields (list, optional): List of column names to retrieve from database. If None, uses default set: object_id, right_ascension, declination, plus all photometric and morphological columns. timeout (float, optional): Query timeout in seconds. Default: 120 s. print_query (bool, optional): If True, print the ADQL query. Default: False. check_spectra (bool, optional): If True, check for associated spectra for photometric sources. Default: False. Returns: astropy.table.Table: Catalog of sources with standardized FRB column names. Includes ra, dec, and Euclid_{VIS,J,H,Y} magnitudes. Empty table if no sources found. """ # Default fields to query if query_fields is None: query_fields = [ 'object_id', 'right_ascension', 'declination', 'flux_vis_sersic', 'fluxerr_vis_sersic', 'flux_j_sersic', 'fluxerr_j_sersic', 'flux_h_sersic', 'fluxerr_h_sersic', 'flux_y_sersic', 'fluxerr_y_sersic', 'ellipticity', 'kron_radius', 'segmentation_area', 'vis_det', 'det_quality_flag' ] # Use cone_search for efficient regional query # Note: cone_search returns results with 'dist' column showing angular distance if self.verbose: print(f"Querying Euclid mer_catalogue around {self.coord.ra.deg:.4f}, {self.coord.dec.deg:.4f}") print(f"Search radius: {self.radius.to(u.arcmin).value:.2f} arcmin") # Perform cone search # By default, Euclid.cone_search uses table_name='catalogue.mer_catalogue' with _query_timeout(timeout): job = Euclid.cone_search( coordinate=self.coord, radius=self.radius, columns=query_fields, async_job=True ) # Get results with _query_timeout(timeout): photom_catalog = job.get_results() if photom_catalog is None or len(photom_catalog) == 0: self.catalog = Table() self.catalog.meta['radius'] = self.radius self.catalog.meta['survey'] = self.survey if self.verbose: print("No sources found in Euclid.") self.validate_catalog() return self.catalog # Clean up catalog - rename columns to FRB standard names self.catalog = catalog_utils.clean_cat(photom_catalog, photom['Euclid']) # Add metadata self.catalog.meta['radius'] = self.radius self.catalog.meta['survey'] = self.survey if self.verbose: print(f"Found {len(self.catalog)} sources in Euclid") # Optionally check for associated spectra # This is slow so only do if requested and if we have photometric sources to check if check_spectra: if self.verbose: print("Checking for associated spectra...") euclid_ids = self.catalog['Euclid_ID'] has_spec = self.spectra_exist(euclid_ids) self.catalog['Euclid_has_spectrum'] = has_spec return self.catalog
[docs] def spectra_exist(self, euclid_ids: list | np.ndarray | int): """ Check if spectroscopic data exists for photometric sources. Queries the spectra_source table and attempts to match with photometric catalog. Only adds spectrum if found. Adds two columns to self.catalog: - Euclid_has_spectrum (bool) - Euclid_n_datalinks (int) This method queries Euclid datalinks using the Euclid object IDs in the catalog. """ euclid_ids = np.atleast_1d(euclid_ids) ncat = len(euclid_ids) if ncat>100: warnings.warn(f"Checking spectra for {ncat} sources may take a long time. Consider limiting to a smaller subset.") # Run a get_datalink query result = Euclid.get_datalinks(ids = euclid_ids) if result is not None and len(result) > 0: unique_ids = np.unique(result['ID']) unique_ids = [uid.replace('sedm ', '') for uid in unique_ids] # Clean up IDs has_spec = np.isin(euclid_ids, unique_ids) return has_spec
[docs] def get_spectrum(self, euclid_id, output_folder=None, timeout=120): """ Retrieve the spectrum for a given Euclid object ID. Args: euclid_id (int): The Euclid object ID to retrieve the spectrum for. output_folder (str, optional): Output folder for spectrum FITS. If None, uses temporary location. timeout (float, optional): Query timeout in seconds. Default: 120 s. Returns: tuple: (spectrum_data, spectrum_header) if spectrum found, (None, None) otherwise. """ try: if self.verbose: print(f"Attempting to retrieve spectrum for Euclid ID {euclid_id}") with _query_timeout(timeout): has_spec = self.spectra_exist([euclid_id])[0] if not has_spec: if self.verbose: print(f"No datalinks found for Euclid ID {euclid_id}") return None, None else: if self.verbose: print(f"Datalinks found for Euclid ID {euclid_id}.") # Download spectra if output_folder is None: output_folder = str(euclid_id) # Use ID as folder name to avoid collisions with _query_timeout(timeout): Euclid.get_spectrum(source_id=euclid_id, output_file=f'{output_folder}/spectrum.fits', verbose=False) except Exception as e: if self.verbose: print(f"Spectrum retrieval failed for Euclid ID {euclid_id}: {e}") return None, None
[docs] def get_cutout(self, imsize=None, output_file=None, verbose=None, timeout=120): """ Get a cutout of a Euclid MER background-subtracted mosaic image. Queries the mosaic_product table to find MER background-subtracted mosaics covering the target region, then retrieves a cutout. Args: imsize (Angle, optional): Size of cutout image. Default: 2 arcmin. output_file (str, optional): Output filename for cutout FITS. If None, uses temporary location. verbose (bool, optional): Verbosity. If None, uses self.verbose. Returns: tuple: (data_array, fits_header) if cutout successful, (None, None) otherwise. Note: MER cutouts are background-subtracted VIS-band mosaic images. Each ~1'x1' cutout weighs ~5.5 MB and takes <1 second to download. """ if verbose is None: verbose = self.verbose if imsize is None: imsize = 2 * u.arcmin if verbose: print(f"Attempting to retrieve Euclid cutout: {imsize.to(u.arcmin).value:.2f} arcmin") # Query for a VIS mosaic that intersects the requested region. # Euclid.get_cutout works on MER mosaics, so we fetch one matching file path. half_size = 0.5 * imsize radius_deg = half_size.to(u.deg).value query = ( "SELECT TOP 1 file_path, file_name, tile_index " "FROM q1.mosaic_product " "WHERE instrument_name='VIS' " f"AND INTERSECTS(CIRCLE({self.coord.ra.deg:.8f}, {self.coord.dec.deg:.8f}, {radius_deg:.8f}), fov)=1 " "ORDER BY creation_date DESC" ) if verbose: print(query) with _query_timeout(timeout): mosaic_job = Euclid.launch_job_async(query, verbose=False) with _query_timeout(timeout): mosaic_table = mosaic_job.get_results() if mosaic_table is None or len(mosaic_table) == 0: if verbose: print("No Euclid VIS mosaic found for requested region") self.cutout = None self.cutout_hdr = None return None, None file_path = f"{mosaic_table['file_path'][0]}/{mosaic_table['file_name'][0]}" tile_index = str(mosaic_table['tile_index'][0]) with _query_timeout(timeout): cutout_paths = Euclid.get_cutout( file_path=file_path, instrument='VIS', id=tile_index, coordinate=self.coord, radius=half_size, output_file=output_file, verbose=False ) if cutout_paths is None or len(cutout_paths) == 0: if verbose: print("Euclid.get_cutout returned no files") self.cutout = None self.cutout_hdr = None return None, None cutout_file = cutout_paths[0] with fits.open(cutout_file) as hdul: self.cutout = hdul[0].data self.cutout_hdr = hdul[0].header.copy() # Astroquery writes to a temporary path when output_file is not set. # We keep data in memory but remove that on-disk temp artifact. if output_file is None: try: if os.path.isfile(cutout_file): os.remove(cutout_file) parent_dir = os.path.dirname(cutout_file) if parent_dir and os.path.basename(parent_dir).startswith('temp_'): shutil.rmtree(parent_dir, ignore_errors=True) except OSError: pass self.cutout_size = imsize return self.cutout, self.cutout_hdr