Source code for atip.simulator

"""Module containing an interface with the AT simulator."""
import logging
from warnings import warn

import at
import numpy
import cothread
from scipy.constants import speed_of_light
from pytac.exceptions import FieldException


[docs]class ATSimulator(object): """A centralised class which makes use of AT to simulate the physics data for the copy of the AT lattice which it holds. It works as follows, when a change is made to the lattice in Pytac it is added to the queue attribute of this class. When the queue has changes on it a recalculation is triggered, all the changes are applied to the lattice and then the physics data calculated. This ensures that the physics data is up to date. **Attributes** Attributes: up_to_date (cothread.Event): A flag that indicates if the physics data is up to date with all the changes made to the AT lattice. .. Private Attributes: _at_lat (at.lattice_object.Lattice): The centralised instance of an AT lattice from which the physics data is calculated. _rp (numpy.array): A boolean array to be used as refpts for the physics calculations. _emit_calc (bool): Whether or not to perform the beam envelope based emittance calculations. _emitdata (tuple): Emittance, the output of the AT physics function ohmi_envelope (see at.lattice.radiation.py). _lindata (tuple): Linear optics data, the output of the AT physics function linopt (see at.lattice.linear.py). _radint (tuple): The 5 Synchrotron Integrals for uncoupled lattices. _queue (cothread.EventQueue): A queue of changes to be applied to the centralised lattice on the next recalculation cycle. _paused (cothread.Event): A flag used to temporarily pause the physics calculations. _calculation_thread (cothread.Thread): A thread to check the queue for new changes to the AT lattice and recalculate the physics data upon a change. """ def __init__(self, at_lattice, callback=None, emit_calc=True): """ .. Note:: To avoid errors, the physics data must be initially calculated here, during creation, otherwise it could be accidentally referenced before the attributes _emitdata and _lindata exist due to delay between class creation and the end of the first calculation in the thread. Args: at_lattice (at.lattice_object.Lattice): An instance of an AT lattice object. callback (callable): Optional, if passed it is called on completion of each round of physics calculations. emit_calc (bool): Whether or not to perform the beam envelope based emittance calculations. **Methods:** """ if (not callable(callback)) and (callback is not None): raise TypeError( "If passed, 'callback' should be callable, {0} is " "not.".format(callback) ) self._at_lat = at_lattice self._rp = numpy.ones(len(at_lattice) + 1, dtype=bool) self._emit_calc = emit_calc # Initial phys data calculation. if self._emit_calc: self._at_lat.radiation_on() self._emitdata = self._at_lat.ohmi_envelope(self._rp) self._at_lat.radiation_off() self._lindata = self._at_lat.linopt( refpts=self._rp, get_chrom=True, coupled=False ) self._radint = self._at_lat.get_radiation_integrals(0.0, self._lindata[3]) # Threading stuff initialisation. self._queue = cothread.EventQueue() # Explicitly manage the cothread Events, so turn off auto_reset. # These are False when reset, True when signalled. self._paused = cothread.Event(auto_reset=False) self.up_to_date = cothread.Event(auto_reset=False) self.up_to_date.Signal() self._calculation_thread = cothread.Spawn(self._recalculate_phys_data, callback)
[docs] def queue_set(self, func, field, value): """Add a change to the queue, to be applied when the queue is emptied. Args: func (callable): The function to be called to apply the change. field (str): The field to be changed. value (float): The value to be set. """ self._queue.Signal((func, field, value))
def _gather_one_sample(self): """If the queue is empty Wait() yields until an item is added. When the queue is not empty the oldest change will be removed and applied to the AT lattice. """ apply_change_method, field, value = self._queue.Wait() apply_change_method(field, value) def _recalculate_phys_data(self, callback): """Target function for the Cothread thread. Recalculates the physics data dependant on the status of the '_paused' flag and the length of the queue. The calculations only take place if '_paused' is False and there is one or more changes on the queue. .. Note:: If an error or exception is raised in the running thread then it does not continue running so subsequent calculations are not performed. To fix this we convert all errors raised inside the thread to warnings. Args: callback (callable): to be called after each round of calculations, indicating that they have concluded. Warns: at.AtWarning: any error or exception that was raised in the thread, but as a warning. """ while True: logging.debug("Starting recalculation loop") self._gather_one_sample() while self._queue: self._gather_one_sample() if bool(self._paused) is False: try: if self._emit_calc: logging.debug("Starting emittance calculation.") self._at_lat.radiation_on() self._emitdata = self._at_lat.ohmi_envelope(self._rp) logging.debug("Completed emittance calculation") logging.debug("Starting linear optics calculation.") self._at_lat.radiation_off() self._lindata = self._at_lat.linopt( 0.0, self._rp, True, coupled=False ) logging.debug("Completed linear optics calculation.") self._radint = self._at_lat.get_radiation_integrals( twiss=self._lindata[3] ) logging.debug("All calculation complete.") except Exception as e: warn(at.AtWarning(e)) # Signal up to date before the callback is executed in case # the callback requires data that requires the calculation # to be up to date. self.up_to_date.Signal() if callback is not None: logging.debug("Executing callback function.") callback() logging.debug("Callback completed.")
[docs] def toggle_calculations(self): """Pause or unpause the physics calculations by setting or clearing the _paused flag. N.B. this does not pause the emptying of the queue. """ if self._paused: self._paused.Reset() else: self._paused.Signal()
[docs] def pause_calculations(self): self._paused.Signal()
[docs] def unpause_calculations(self): self._paused.Reset()
[docs] def trigger_calculation(self): self.up_to_date.Reset() self.unpause_calculations() # Add a null item to the queue. A recalculation will happen # when it has been applied. self.queue_set(lambda *x: None, None, None)
[docs] def wait_for_calculations(self, timeout=10): """Wait until the physics calculations have taken account of all changes to the AT lattice, i.e. the physics data is fully up to date. Args: timeout (float, optional): The number of seconds to wait for. Returns: bool: False if the timeout elapsed before the calculations concluded, else True. """ try: self.up_to_date.Wait(timeout) return True except cothread.Timedout: return False
# Get lattice related data:
[docs] def get_at_element(self, index): """Return the AT element corresponding to the given index. Args: index (int): The index of the AT element to return. Returns: at.elements.Element: The element specified by the given index. """ return self._at_lat[index - 1]
[docs] def get_at_lattice(self): """Return a copy of the AT lattice object. Returns: at.lattice_object.Lattice: A copy of the AT lattice object. """ return self._at_lat.copy()
[docs] def get_s(self): """Return the s position of every element in the AT lattice Returns: list: The s position of each element. """ return list(self._lindata[3]["s_pos"][:-1])
[docs] def get_total_bend_angle(self): """Return the total bending angle of all the dipoles in the AT lattice. Returns: float: The total bending angle for the AT lattice. """ theta_sum = 0.0 for elem in self._at_lat: if isinstance(elem, at.lattice.elements.Dipole): theta_sum += elem.BendingAngle return numpy.degrees(theta_sum)
[docs] def get_total_absolute_bend_angle(self): """Return the total absolute bending angle of all the dipoles in the AT lattice. Returns: float: The total absolute bending angle for the AT lattice. """ theta_sum = 0.0 for elem in self._at_lat: if isinstance(elem, at.lattice.elements.Dipole): theta_sum += abs(elem.BendingAngle) return numpy.degrees(theta_sum)
[docs] def get_energy(self): """Return the energy of the AT lattice. Taken from the AT attribute. Returns: float: The energy of the AT lattice. """ return self._at_lat.energy
# Get global linear optics data:
[docs] def get_tune(self, field=None): """Return the tune for the AT lattice for the specified plane. .. Note:: A special consideration is made so only the fractional digits of the tune are returned. Args: field (str): The desired field (x or y) of tune, if None return both tune dimensions. Returns: float: The x or y tune for the AT lattice. Raises: FieldException: if the specified field is not valid for tune. """ if field is None: return numpy.array(self._lindata[1]) % 1 elif field == "x": return self._lindata[1][0] % 1 elif field == "y": return self._lindata[1][1] % 1 else: raise FieldException("Field {0} is not a valid tune plane.".format(field))
[docs] def get_chromaticity(self, field=None): """Return the chromaticity for the AT lattice for the specified plane. Args: field (str): The desired field (x or y) of chromaticity, if None return both chromaticity dimensions. Returns: float: The x or y chromaticity for the AT lattice. Raises: FieldException: if the specified field is not valid for chromaticity. """ if field is None: return self._lindata[2] elif field == "x": return self._lindata[2][0] elif field == "y": return self._lindata[2][1] else: raise FieldException( "Field {0} is not a valid chromaticity " "plane.".format(field) )
# Get local linear optics data:
[docs] def get_orbit(self, field=None): """Return the closed orbit at each element in the AT lattice for the specified plane. Args: field (str): The desired field (x, px, y, or py) of closed orbit, if None return whole orbit vector. Returns: numpy.array: The x, x phase, y or y phase for the AT lattice as an array of floats the length of the AT lattice. Raises: FieldException: if the specified field is not valid for orbit. """ if field is None: return self._lindata[3]["closed_orbit"][:-1] elif field == "x": return self._lindata[3]["closed_orbit"][:-1, 0] elif field == "px": return self._lindata[3]["closed_orbit"][:-1, 1] elif field == "y": return self._lindata[3]["closed_orbit"][:-1, 2] elif field == "py": return self._lindata[3]["closed_orbit"][:-1, 3] else: raise FieldException( "Field {0} is not a valid closed orbit plane.".format(field) )
[docs] def get_dispersion(self, field=None): """Return the dispersion at every element in the AT lattice for the specified plane. Args: field (str): The desired field (x, px, y, or py) of dispersion, if None return whole dispersion vector. Returns: numpy.array: The eta x, eta prime x, eta y or eta prime y for the AT lattice as an array of floats the length of the AT lattice. Raises: FieldException: if the specified field is not valid for dispersion. """ if field is None: return self._lindata[3]["dispersion"][:-1] elif field == "x": return self._lindata[3]["dispersion"][:-1, 0] elif field == "px": return self._lindata[3]["dispersion"][:-1, 1] elif field == "y": return self._lindata[3]["dispersion"][:-1, 2] elif field == "py": return self._lindata[3]["dispersion"][:-1, 3] else: raise FieldException( "Field {0} is not a valid dispersion plane.".format(field) )
[docs] def get_alpha(self): """Return the alpha vector at every element in the AT lattice. Returns: numpy.array: The alpha vector for each element. """ return self._lindata[3]["alpha"][:-1]
[docs] def get_beta(self): """Return the beta vector at every element in the AT lattice. Returns: numpy.array: The beta vector for each element. """ return self._lindata[3]["beta"][:-1]
[docs] def get_mu(self): """Return mu at every element in the AT lattice. Returns: numpy.array: The mu array for each element. """ return self._lindata[3]["mu"][:-1]
[docs] def get_m44(self): """Return the 4x4 transfer matrix for every element in the AT lattice. Returns: numpy.array: The 4x4 transfer matrix for each element. """ return self._lindata[3]["m44"][:-1]
# Get lattice emittance from beam envelope:
[docs] def get_emittance(self, field=None): """Return the emittance for the AT lattice for the specified plane. .. Note:: The emittance at the entrance of the AT lattice as it is constant throughout the lattice, and so which element's emittance is returned is arbitrary. Args: field (str): The desired field (x or y) of emittance, if None return both emittance dimensions. Returns: float: The x or y emittance for the AT lattice. Raises: FieldException: if the specified field is not valid for emittance. """ if field is None: return self._emitdata[0]["emitXY"] elif field == "x": return self._emitdata[0]["emitXY"][0] elif field == "y": return self._emitdata[0]["emitXY"][1] else: raise FieldException( "Field {0} is not a valid emittance plane.".format(field) )
# Get lattice data from radiation integrals:
[docs] def get_radiation_integrals(self): """Return the 5 Synchrotron Integrals for the AT lattice. Returns: numpy.array: The 5 radiation integrals. """ return numpy.asarray(self._radint)
[docs] def get_momentum_compaction(self): """Return the linear momentum compaction factor for the AT lattice. Returns: float: The linear momentum compaction factor of the AT lattice. """ I1, _, _, _, _ = self._radint return I1 / self._lindata[3]["s_pos"][-1]
[docs] def get_energy_spread(self): """Return the energy spread for the AT lattice. Returns: float: The energy spread for the AT lattice. """ _, I2, I3, I4, _ = self._radint gamma = self.get_energy() / (at.physics.e_mass) return gamma * numpy.sqrt((at.physics.Cq * I3) / ((2 * I2) + I4))
[docs] def get_energy_loss(self): """Return the energy loss per turn of the AT lattice. Returns: float: The energy loss of the AT lattice. """ _, I2, _, _, _ = self._radint return (at.physics.Cgamma * I2 * self.get_energy() ** 4) / (2 * numpy.pi)
[docs] def get_damping_partition_numbers(self): """Return the damping partition numbers for the 3 normal modes. Returns: numpy.array: The damping partition numbers of the AT lattice. """ _, I2, _, I4, _ = self._radint Jx = 1 - (I4 / I2) Je = 2 + (I4 / I2) Jy = 4 - (Jx + Je) # Check they sum to 4, don't just assume Jy is 1. return numpy.asarray([Jx, Jy, Je])
[docs] def get_damping_times(self): """Return the damping times for the 3 normal modes. [tx, ty, tz] = (2*E0*T0)/(U0*[Jx, Jy, Jz]) [1] [1] A.Wolski; CERN Accelerator School, Advanced Accelerator Physics Course, Low Emittance Machines, Part 1: Beam Dynamics with Synchrotron Radiation; August 2013; eqn. 68 Returns: numpy.array: The damping times of the AT lattice. """ E0 = self.get_energy() U0 = self.get_energy_loss() T0 = self._at_lat.circumference / speed_of_light return (2 * T0 * E0) / (U0 * self.get_damping_partition_numbers())
[docs] def get_linear_dispersion_action(self): """Return the Linear Dispersion Action ("curly H") for the AT lattice. Returns: float: Curly H for the AT lattice """ _, I2, _, _, I5 = self._radint return I5 / I2
[docs] def get_horizontal_emittance(self): """Return the horizontal emittance for the AT lattice calculated from the radiation integrals, as opposed to the beam envelope formalism used by AT's ohmi_envelope function. Returns: float: The horizontal ('x') emittance for the AT lattice. """ _, I2, _, I4, I5 = self._radint gamma = self.get_energy() / (at.physics.e_mass) return (I5 * at.physics.Cq * gamma ** 2) / (I2 - I4)