Source code for selectionfunctions.cog_ii

#!/usr/bin/env python
#
# cog_ii.py
# Reads the Gaia DR2 selection function from Completeness
# of the Gaia-verse Paper II, Boubert & Everall (2020).
#
# Copyright (C) 2020  Douglas Boubert & Andrew Everall
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

from __future__ import print_function, division

import os
import h5py
import numpy as np

import astropy.coordinates as coordinates
import astropy.units as units
import h5py
import healpy as hp
from scipy import interpolate, special

from .std_paths import *
from .map import SelectionFunction, ensure_flat_icrs, coord2healpix
from .source import ensure_gaia_g
from . import fetch_utils

from time import time


[docs]class dr2_sf(SelectionFunction): """ Queries the Gaia DR2 selection function (Boubert & Everall, 2019). """
[docs] def __init__(self, map_fname=None, version='modelAB', crowding=False, bounds=True): """ Args: map_fname (Optional[:obj:`str`]): Filename of the BoubertEverall2019 selection function. Defaults to :obj:`None`, meaning that the default location is used. version (Optional[:obj:`str`]): The selection function version to download. Valid versions are :obj:`'modelT'` and :obj:`'modelAB'` Defaults to :obj:`'modelT'`. crowding (Optional[:obj:`bool`]): Whether or not the selection function includes crowding. Defaults to :obj:`'False'`. bounds (Optional[:obj:`bool`]): Whether or not the selection function is bounded to 0.0 < G < 25.0. Defaults to :obj:`'True'`. """ if map_fname is None: map_fname = os.path.join(data_dir(), 'cog_ii', 'cog_ii_dr2.h5') t_start = time() with h5py.File(map_fname, 'r') as f: # Load auxilliary data print('Loading auxilliary data ...') self._g_grid = f['g_grid'][...] self._n_field = f['n_field'][...] self._nside = hp.npix2nside(self._n_field.shape[0]) self._crowding = crowding self._bounds = bounds if crowding == True: self._nside_crowding = 1024 self._log10_rho_grid = f['log10_rho_grid'][...] self._log10_rho_field = np.log10(np.maximum(1.0,f['neighbour_field'][...])/hp.nside2pixarea(self._nside_crowding,degrees=True)) t_auxilliary = time() # Load selection function print('Loading selection function ...') if version == 'modelT': if crowding == True: self._theta = f['t_theta_percentiles'][:,:,2] else: self._theta = f['t_theta_percentiles'][0,:,2] elif version == 'modelAB': if crowding == True: self._alpha = f['ab_alpha_percentiles'][:,:,2] self._beta = f['ab_beta_percentiles'][:,:,2] else: self._alpha = f['ab_alpha_percentiles'][0,:,2] self._beta = f['ab_beta_percentiles'][0,:,2] if bounds == True: self._g_min = 0.0 self._g_max = 25.0 else: self._g_min = -np.inf self._g_max = np.inf t_sf = time() # Create interpolator print('Creating selection function interpolator...') if version == 'modelT': if crowding == True: self._theta_interpolator = interpolate.RectBivariateSpline(self._log10_rho_grid,self._g_grid,self._theta) self._interpolator = lambda _log10_rho, _g : (self._theta_interpolator(_log10_rho, _g, grid = False),) else: self._theta_interpolator = interpolate.interp1d(self._g_grid,self._theta,kind='linear',fill_value='extrapolate') self._interpolator = lambda _g : (self._theta_interpolator(_g),) elif version == 'modelAB': if crowding == True: self._alpha_interpolator = interpolate.RectBivariateSpline(self._log10_rho_grid,self._g_grid,self._alpha) self._beta_interpolator = interpolate.RectBivariateSpline(self._log10_rho_grid,self._g_grid,self._beta) self._interpolator = lambda _log10_rho, _g : (self._alpha_interpolator(_log10_rho, _g, grid = False),self._beta_interpolator(_log10_rho, _g, grid = False)) else: self._alpha_interpolator = interpolate.interp1d(self._g_grid,self._alpha,kind='linear',fill_value='extrapolate') self._beta_interpolator = interpolate.interp1d(self._g_grid,self._beta,kind='linear',fill_value='extrapolate') self._interpolator = lambda _g : (self._alpha_interpolator(_g),self._beta_interpolator(_g)) t_interpolator = time() t_finish = time() print('t = {:.3f} s'.format(t_finish - t_start)) print(' auxilliary: {: >7.3f} s'.format(t_auxilliary-t_start)) print(' sf: {: >7.3f} s'.format(t_sf-t_auxilliary)) print('interpolator: {: >7.3f} s'.format(t_interpolator-t_sf))
def _selection_function(self,_n,_parameters): if len(_parameters) == 1: # This must be Model T, _parameters = (theta) _t = _parameters[0] # 0 < theta < 1, make it so! _t[_t<0.0] = 1e-6 _t[_t>1.0] = 1-1e-6 _result = special.betainc(5,_n-4,_t) elif len(_parameters) == 2: # This must be Model AB, _parameters = (alpha,beta) _a, _b = _parameters # 0.1 < alpha,beta < 10000, make it so! _a[_a<1e-1] = 1e-1 _a[_a>1e+4] = 1e+4 _b[_b<1e-1] = 1e-1 _b[_b>1e+4] = 1e+4 _result = np.ones(_a.shape) for _m in range(1,5)[::-1]: _result = 1.0 + _result*((_n-_m+1)/_m)*(_a+_m-1)/(_b+_n-_m) _result = 1.0 - _result*special.beta(_a,_b+_n)/special.beta(_a,_b) return _result
[docs] @ensure_flat_icrs @ensure_gaia_g def query(self, sources): """ Returns the selection function at the requested coordinates. Args: coords (:obj:`astropy.coordinates.SkyCoord`): The coordinates to query. Returns: Selection function at the specified coordinates, as a fraction. """ # Convert coordinates to healpix indices hpxidx = coord2healpix(sources.coord, 'icrs', self._nside, nest=True) # Calculate the number of observations of each source n = self._n_field[hpxidx] # Extract Gaia G magnitude G = sources.photometry.measurement['gaia_g'] if self._crowding == True: # Work out HEALPix index in crowding nside hpxidx_crowding = np.floor(hpxidx * hp.nside2npix(self._nside_crowding) / hp.nside2npix(self._nside)).astype(np.int) # Calculate the local density field at each source log10_rho = self._log10_rho_field[hpxidx_crowding] # Calculate parameters sf_parameters = self._interpolator(log10_rho,G) else: # Calculate parameters sf_parameters = self._interpolator(G) # Evaluate selection function selection_function = self._selection_function(n,sf_parameters) if self._bounds == True: _outside_bounds = np.where( (G<self._g_min) | (G>self._g_max) ) selection_function[_outside_bounds] = 0.0 return selection_function
[docs]class dr3_sf(dr2_sf):
[docs] def __init__(self, map_fname_dr3=None,map_fname_dr2=None, version='modelAB', crowding=False, bounds=True): if map_fname_dr3 is None: map_fname_dr3 = os.path.join(data_dir(), 'cog_ii', 'n_field_dr3.h5') dr2_sf.__init__(self, map_fname_dr2, version, crowding, bounds) with h5py.File(map_fname_dr3, 'r') as f: # Load auxilliary data print('Loading auxilliary data ...') self._n_field = f['n_field'][...] self._nside = hp.npix2nside(self._n_field.shape[0])
[docs]def fetch(): """ Downloads the specified version of the Bayestar dust map. Args: version (Optional[:obj:`str`]): The map version to download. Valid versions are :obj:`'bayestar2019'` (Green, Schlafly, Finkbeiner et al. 2019), :obj:`'bayestar2017'` (Green, Schlafly, Finkbeiner et al. 2018) and :obj:`'bayestar2015'` (Green, Schlafly, Finkbeiner et al. 2015). Defaults to :obj:`'bayestar2019'`. Raises: :obj:`ValueError`: The requested version of the map does not exist. :obj:`DownloadError`: Either no matching file was found under the given DOI, or the MD5 sum of the file was not as expected. :obj:`requests.exceptions.HTTPError`: The given DOI does not exist, or there was a problem connecting to the Dataverse. """ doi = {'dr2': '10.7910/DVN/PDFOVC', 'edr3_nfield': '10.7910/DVN/I3TGTS'} requirements = {'dr2': {'filename': 'cog_ii_dr2.h5'}, 'edr3_nfield':{'filename': 'n_field_dr3.h5'}} local_fname = os.path.join(data_dir(), 'cog_ii', requirements['dr2']['filename']) # Download the data fetch_utils.dataverse_download_doi( doi['dr2'], local_fname, file_requirements=requirements['dr2']) local_fname = os.path.join(data_dir(), 'cog_ii', requirements['edr3_nfield']['filename']) # Download the data fetch_utils.dataverse_download_doi( doi['edr3_nfield'], local_fname, file_requirements=requirements['edr3_nfield'])