Module mayo_hci.create_synthetic_data_with_disk_planet

Injects synthetic circumstellar signal into an empty ADI dataset

Expand source code
'''
Injects synthetic circumstellar signal into an empty ADI dataset
'''

'''
 MAYO pipeline, from Pairet et al. 2020
    Copyright (C) 2020, Benoit Pairet

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
'''

import vip_hci as vip
import numpy as np

import mayo_hci
import torch
import kornia

def create_synthetic_data_with_disk_planet(empty_data,angles,psf,add_synthetic_signal):
    """
    automatic_load_data(data_name,channel=0,dir='default',RDI=False,quick_look=0,crop=0,center_im=None)
        loads ADI datasets automatically 
    Parameters
    ----------
    data : numpy array
        t x n x n the empty (without circumstellar signal) ADI dataset
    angles :  numpy array
        list of angles
    psf :  numpy array
        n x n psf
    add_synthetic_signal
        tuple containing all the properties of the injected signal 
    Returns
    -------
    data : numpy array
        t x n x n, ADI dataset (empty_data + circumstellar signal injected)
    disk : numpy array
        n x n, the injected circumstellar signal (disk+planet) 
    """ 
    disk_max_intensity = add_synthetic_signal['disk_max_intensity']
    disk_inclination = add_synthetic_signal['disk_inclination']
    planets_positions_intensities = tuple(add_synthetic_signal['planets_positions_intensities'])
    if 'xdo' not in add_synthetic_signal:
        add_synthetic_signal['xdo'] = 0
    if 'ydo' not in add_synthetic_signal:
        add_synthetic_signal['ydo'] = 0
    if 'pa' not in add_synthetic_signal:
        add_synthetic_signal['pa'] = 80
    if 'omega' not in add_synthetic_signal:
        add_synthetic_signal['omega'] = 45
    if 'density_distribution' not in add_synthetic_signal:
        add_synthetic_signal['density_distribution'] = {'name':'2PowerLaws'}
    if 'phase_function' not in add_synthetic_signal:
        add_synthetic_signal['phase_function'] = {'name': 'HG', 'g': 0., 'polar': False}
    if 'disk_intensity_thresh' not in add_synthetic_signal:
        add_synthetic_signal['disk_intensity_thresh'] = 60
    t,n,_ = empty_data.shape
    kernel = np.fft.fft2(np.fft.fftshift(psf))
    # 
    # Disk injection
    # 
    my_disk = vip.metrics.scattered_light_disk.ScatteredLightDisk(nx=n,ny=n,xdo=add_synthetic_signal['xdo'], ydo=add_synthetic_signal['ydo'])
    my_disk.set_inclination(add_synthetic_signal['disk_inclination'])
    my_disk.set_pa(add_synthetic_signal['pa'])
    my_disk.set_omega(add_synthetic_signal['omega'])
    my_disk.set_flux_max(100)
    my_disk.set_density_distribution(add_synthetic_signal['density_distribution'])
    my_disk.set_phase_function(add_synthetic_signal['phase_function'])
    disk = my_disk.compute_scattered_light()

    disk = disk*(disk>add_synthetic_signal['disk_intensity_thresh'])

    disk = disk/np.max(disk)*disk_max_intensity

    # 
    # Planet injection
    # 
    if planets_positions_intensities:
        planet = np.zeros((n,n))
        for xx,yy,intensity in planets_positions_intensities:
            planet[xx,yy] = intensity
        disk += planet
    cube_disk = np.zeros((t,n,n))

    center: torch.tensor = torch.ones(1, 2)
    center[..., 0] = n / 2  # x
    center[..., 1] = n / 2  # yd
    scale: torch.tensor = torch.ones(1)

    torch_disk = torch.tensor([[disk]],requires_grad=False)

    for k in range(t):
        angle: torch.tensor = torch.ones(1) * (angles[k])
        M: torch.tensor = kornia.get_rotation_matrix2d(center, angle, scale)
        rotated_disk = kornia.warp_affine(torch_disk.float(), M, dsize=(n,n))
        cube_disk[k,:,:] = mayo_hci.A(rotated_disk[0,0,:,:].numpy(),kernel)

    data = empty_data + cube_disk
    return data, disk

Functions

def create_synthetic_data_with_disk_planet(empty_data, angles, psf, add_synthetic_signal)

automatic_load_data(data_name,channel=0,dir='default',RDI=False,quick_look=0,crop=0,center_im=None) loads ADI datasets automatically Parameters


data : numpy array
t x n x n the empty (without circumstellar signal) ADI dataset
angles :  numpy array
list of angles
psf :  numpy array
n x n psf
add_synthetic_signal
tuple containing all the properties of the injected signal

Returns

data : numpy array
t x n x n, ADI dataset (empty_data + circumstellar signal injected)
disk : numpy array
n x n, the injected circumstellar signal (disk+planet)
Expand source code
def create_synthetic_data_with_disk_planet(empty_data,angles,psf,add_synthetic_signal):
    """
    automatic_load_data(data_name,channel=0,dir='default',RDI=False,quick_look=0,crop=0,center_im=None)
        loads ADI datasets automatically 
    Parameters
    ----------
    data : numpy array
        t x n x n the empty (without circumstellar signal) ADI dataset
    angles :  numpy array
        list of angles
    psf :  numpy array
        n x n psf
    add_synthetic_signal
        tuple containing all the properties of the injected signal 
    Returns
    -------
    data : numpy array
        t x n x n, ADI dataset (empty_data + circumstellar signal injected)
    disk : numpy array
        n x n, the injected circumstellar signal (disk+planet) 
    """ 
    disk_max_intensity = add_synthetic_signal['disk_max_intensity']
    disk_inclination = add_synthetic_signal['disk_inclination']
    planets_positions_intensities = tuple(add_synthetic_signal['planets_positions_intensities'])
    if 'xdo' not in add_synthetic_signal:
        add_synthetic_signal['xdo'] = 0
    if 'ydo' not in add_synthetic_signal:
        add_synthetic_signal['ydo'] = 0
    if 'pa' not in add_synthetic_signal:
        add_synthetic_signal['pa'] = 80
    if 'omega' not in add_synthetic_signal:
        add_synthetic_signal['omega'] = 45
    if 'density_distribution' not in add_synthetic_signal:
        add_synthetic_signal['density_distribution'] = {'name':'2PowerLaws'}
    if 'phase_function' not in add_synthetic_signal:
        add_synthetic_signal['phase_function'] = {'name': 'HG', 'g': 0., 'polar': False}
    if 'disk_intensity_thresh' not in add_synthetic_signal:
        add_synthetic_signal['disk_intensity_thresh'] = 60
    t,n,_ = empty_data.shape
    kernel = np.fft.fft2(np.fft.fftshift(psf))
    # 
    # Disk injection
    # 
    my_disk = vip.metrics.scattered_light_disk.ScatteredLightDisk(nx=n,ny=n,xdo=add_synthetic_signal['xdo'], ydo=add_synthetic_signal['ydo'])
    my_disk.set_inclination(add_synthetic_signal['disk_inclination'])
    my_disk.set_pa(add_synthetic_signal['pa'])
    my_disk.set_omega(add_synthetic_signal['omega'])
    my_disk.set_flux_max(100)
    my_disk.set_density_distribution(add_synthetic_signal['density_distribution'])
    my_disk.set_phase_function(add_synthetic_signal['phase_function'])
    disk = my_disk.compute_scattered_light()

    disk = disk*(disk>add_synthetic_signal['disk_intensity_thresh'])

    disk = disk/np.max(disk)*disk_max_intensity

    # 
    # Planet injection
    # 
    if planets_positions_intensities:
        planet = np.zeros((n,n))
        for xx,yy,intensity in planets_positions_intensities:
            planet[xx,yy] = intensity
        disk += planet
    cube_disk = np.zeros((t,n,n))

    center: torch.tensor = torch.ones(1, 2)
    center[..., 0] = n / 2  # x
    center[..., 1] = n / 2  # yd
    scale: torch.tensor = torch.ones(1)

    torch_disk = torch.tensor([[disk]],requires_grad=False)

    for k in range(t):
        angle: torch.tensor = torch.ones(1) * (angles[k])
        M: torch.tensor = kornia.get_rotation_matrix2d(center, angle, scale)
        rotated_disk = kornia.warp_affine(torch_disk.float(), M, dsize=(n,n))
        cube_disk[k,:,:] = mayo_hci.A(rotated_disk[0,0,:,:].numpy(),kernel)

    data = empty_data + cube_disk
    return data, disk