Source code for eko.interpolation

"""Library providing all necessary tools for PDF interpolation.

This library also provides a class to generate the interpolator :class:`InterpolatorDispatcher`.
Upon construction the dispatcher generates a number of functions
to evaluate the interpolator.
"""

import logging
import math
from typing import Sequence, Union

import numba as nb
import numpy as np
import numpy.typing as npt
import scipy.special as sp

logger = logging.getLogger(__name__)


[docs] class Area: """Define area of subgrid interpolators. Upon construction an array of coefficients is generated. Parameters ---------- lower_index: int lower index of the area poly_number: int number of polynomial block: tuple(int, int) kmin and kmax xgrid: numpy.ndarray Grid in x-space from which the interpolators are constructed """ def __init__(self, lower_index, poly_number, block, xgrid): # check range if poly_number < block[0] or block[1] < poly_number: raise ValueError( f"polynom #{poly_number} cannot be a part of the block which" f"spans from {block[0]} to {block[1]}" ) self.xmin = xgrid[lower_index] self.xmax = xgrid[lower_index + 1] self.poly_number = poly_number self.kmin = block[0] self.kmax = block[1] self.coefs = self._compute_coefs(xgrid) def _reference_indices(self): """Iterate over all indices which are part of the block.""" for k in range(self.kmin, self.kmax + 1): if k != self.poly_number: yield k def _compute_coefs(self, xgrid): """Compute the coefficients for this area given a grid on :math:`x`.""" denominator = 1.0 coeffs = np.ones(1) xj = xgrid[self.poly_number] for s, k in enumerate(self._reference_indices()): xk = xgrid[k] denominator *= xj - xk Mxk_coeffs = -xk * coeffs coeffs = np.concatenate(([0.0], coeffs)) coeffs[: s + +1] += Mxk_coeffs coeffs /= denominator return coeffs def __iter__(self): """Iterate the generated coefficients.""" yield from self.coefs
[docs] @nb.njit(cache=True) def log_evaluate_Nx(N, logx, area_list): r"""Evaluate logarithmic interpolator in N-space. A single logarithmic Lagrange interpolator is evaluated in N-space multiplied by the Mellin-inversion factor. .. math:: \tilde p(N)*\exp(- N * \ln(x)) Parameters ---------- N : complex Mellin variable logx : float logarithm of inversion point area_list : list area configuration of basis function Returns ------- res : float kernel * inversion factor """ res = 0.0 for a in area_list: logxmin = a[0] logxmax = a[1] coefs = a[2:] # skip area completely? if logx >= logxmax or np.abs(logx - logxmax) < _atol_eps: continue umax = N * logxmax umin = N * logxmin emax = np.exp(N * (logxmax - logx)) emin = np.exp(N * (logxmin - logx)) for i, coef in enumerate(coefs): tmp = 0.0 facti = math.gamma(i + 1) * pow(-1, i) / pow(N, i + 1) for k in range(i + 1): factk = 1.0 / math.gamma(k + 1) # this condition is actually not necessary in python since # there pow(0,0) == 1 and apparently this is inherited in # Numba/C, however this is mathematically cleaner if np.abs(umax) < _atol_eps and k == 0: pmax = emax else: pmax = pow(-umax, k) * emax # drop factor by analytics? if logx >= logxmin or np.abs(logx - logxmin) < _atol_eps: pmin = 0 else: pmin = pow(-umin, k) * emin tmp += factk * (pmax - pmin) res += coef * facti * tmp return res
[docs] @nb.njit(cache=True) def evaluate_Nx(N, logx, area_list): r"""Evaluate linear interpolator in N-space. A single linear Lagrange interpolator is evaluated in N-space multiplied by the Mellin-inversion factor. .. math:: \tilde p(N)*\exp(- N * \ln(x)) Parameters ---------- N : complex Mellin variable logx : float logarithm of inversion point area_list : list area configuration of basis function Returns ------- res : float basis function * inversion factor """ res = 0.0 for a in area_list: xmin = a[0] xmax = a[1] coefs = a[2:] lnxmax = np.log(xmax) # skip area completely? if logx >= lnxmax: continue for i, coef in enumerate(coefs): if xmin == 0.0: low = 0.0 else: lnxmin = np.log(xmin) low = np.exp(N * (lnxmin - logx) + i * lnxmin) up = np.exp(N * (lnxmax - logx) + i * lnxmax) res += coef * (up - low) / (N + i) return res
[docs] @nb.njit(cache=True) def evaluate_grid(N, is_log, logx, area_list): """Evaluate interpolator in N-space. Parameters ---------- N : complex Mellin variable is_log : boolean is a logarithmic interpolation logx : float logarithm of inversion point area_list : list area configuration of basis function Returns ------- pj : float basis function * inversion factor """ if is_log: pj = log_evaluate_Nx(N, logx, area_list) else: pj = evaluate_Nx(N, logx, area_list) return pj
# TODO lift to runcard? _atol_eps = 10 * np.finfo(float).eps
[docs] @nb.njit(cache=True) def evaluate_x(x, area_list): """Evaluate a single linear Lagrange interpolator in x-space. .. math:: p(x) Parameters ---------- x : float interpolation point area_list : list area configuration of basis function Returns ------- res : float basis function(x) """ res = 0.0 for j, a in enumerate(area_list): xmin = a[0] xmax = a[1] coefs = a[2:] if xmin < x <= xmax or (j == 0 and np.abs(x - xmin) < _atol_eps): for i, coef in enumerate(coefs): res += coef * pow(x, i) return res return res
[docs] @nb.njit(cache=True) def log_evaluate_x(x, area_list): """Evaluate a single logarithmic Lagrange interpolator in x-space. .. math:: p(x) Parameters ---------- x : float interpolation point area_list : list area configuration of basis function Returns ------- res : float basis function(x) """ x = np.log(x) return evaluate_x(x, area_list)
[docs] class BasisFunction: """Represent an element of the polynomial basis. It contains a list of areas for a given polynomial number defined by (xmin-xmax) which in turn contain a list of coefficients. Upon construction it will generate all areas and generate and compile a function to evaluate in N (or x) the interpolator Parameters ---------- xgrid : numpy.ndarray Grid in x-space from which the interpolators are constructed poly_number : int number of polynomial list_of_blocks: list(tuple(int, int)) list of tuples with the (kmin, kmax) values for each area mode_log: bool use logarithmic interpolation? mode_N: bool if true compiles the function on N, otherwise compiles on x """ def __init__( self, xgrid, poly_number, list_of_blocks, mode_log=True, mode_N=True, ): self.poly_number = poly_number self.areas = [] self._mode_log = mode_log self.mode_N = mode_N # create areas for i, block in enumerate(list_of_blocks): if block[0] <= poly_number <= block[1]: new_area = Area(i, self.poly_number, block, xgrid) self.areas.append(new_area) if not self.areas: raise ValueError("Error: no areas were generated") self.areas_representation = self.areas_to_const() # compile # TODO move this to InterpolatorDispatcher self.callable = None if self.mode_N: self.compile_n() else: self.compile_x()
[docs] def is_below_x(self, x): """Check all areas to be below specified threshold. Parameters ---------- x : float reference value Returns ------- is_below_x : bool xmax of highest area <= x? """ # Log if needed if self._mode_log: x = np.log(x) # note that ordering is important! return self.areas[-1].xmax <= x
[docs] def areas_to_const(self): """Return an array containing all areas. The area format is (`xmin`, `xmax`, `numpy.array` of coefficients). Returns ------- numpy.ndarray area config """ # This is necessary as numba will ask for everything # to be immutable area_list = [] for area in self: area_list.append([area.xmin, area.xmax, *area.coefs]) return np.array(area_list)
[docs] def compile_x(self): """Compile the function to evaluate the interpolator in x space.""" if self._mode_log: self.callable = log_evaluate_x else: self.callable = evaluate_x
[docs] def evaluate_x(self, x): """Evaluate basis function in x-space (regardless of the true space). Parameters ---------- x : float evaluated point Returns ------- res : float p(x) """ if self.mode_N: old_call = self.callable self.compile_x() res = self.callable(x, self.areas_representation) self.callable = old_call else: res = self.callable(x, self.areas_representation) return res
[docs] def compile_n(self): r"""Compile the function to evaluate the interpolator in N space. Generates a function :meth:`evaluate_Nx` with a (N, logx) signature. .. math:: \tilde p(N)*\exp(- N * \ln(x)) The polynomials contain naturally factors of :math:`\exp(N * j * \ln(x_{min/max}))` which can be joined with the Mellin inversion factor. """ if self._mode_log: self.callable = log_evaluate_Nx else: self.callable = evaluate_Nx
def __iter__(self): yield from self.areas def __call__(self, *args, **kwargs): """Evaluate function.""" args = list(args) args.append(self.areas_representation) return self.callable(*args, **kwargs)
[docs] class XGrid: """Grid of points in :math:`x`-space. This object represents a suitable grid of momentum fractions to be used to evaluate the PDF over. """ def __init__(self, xgrid: Union[Sequence, npt.NDArray], log: bool = True): ugrid = np.array(np.unique(xgrid), np.float_) if len(xgrid) != len(ugrid): raise ValueError(f"xgrid is not unique: {xgrid}") if len(xgrid) < 2: raise ValueError(f"xgrid needs at least 2 points, received {len(xgrid)}") self.log = log # henceforth ugrid might no longer be the input! # which is ok, because for most of the code this is all we need to do # to distinguish log and non-log if log: self._raw = ugrid ugrid = np.log(ugrid) self.grid = ugrid def __len__(self) -> int: return len(self.grid) def __eq__(self, other) -> bool: """Check equality.""" # check shape before comparing values return len(self) == len(other) and np.allclose(self.raw, other.raw) @property def raw(self) -> np.ndarray: """Untransformed grid.""" return self.grid if not self.log else self._raw @property def size(self) -> int: """Number of pointrs.""" return self.grid.size
[docs] def tolist(self) -> list: """Raw grid as Python list.""" return self.raw.tolist()
[docs] def dump(self) -> dict: """Representation as dictionary.""" return dict(grid=self.tolist(), log=self.log)
[docs] @classmethod def load(cls, doc: dict): """Create object from dictinary.""" return cls(doc["grid"], log=doc["log"])
[docs] @classmethod def fromcard(cls, value: list, log: bool): """Create object from theory card config. The config can invoke other grid generation methods. """ if len(value) == 0: raise ValueError("Empty xgrid!") if value[0] == "make_grid": xgrid = make_grid(*value[1:]) elif value[0] == "lambertgrid": xgrid = lambertgrid(*value[1:]) else: xgrid = np.array(value) return cls(xgrid, log=log)
[docs] class InterpolatorDispatcher: """Setup the interpolator. Upon construction will generate a list of :class:`BasisFunction` objects. Each of these :class:`BasisFunction` objects expose a `callable` method (also accessible as the `__call__` method of the class) which will be numba-compiled. Parameters ---------- xgrid : numpy.ndarray Grid in x-space from which the interpolators are constructed polynomial_degree : int degree of the interpolation polynomial log: bool Whether it is a log or linear interpolator mode_N: bool if true compiles the function on N, otherwise compiles x """ def __init__( self, xgrid: Union[XGrid, Sequence, npt.NDArray], polynomial_degree, mode_N=True ): if not isinstance(xgrid, XGrid): xgrid = XGrid(xgrid) # sanity checks if polynomial_degree < 1: raise ValueError( f"need at least polynomial_degree 1, received {polynomial_degree}" ) if len(xgrid) <= polynomial_degree: raise ValueError( f"to interpolate with degree {polynomial_degree} " " we need at least that much points + 1" ) # Save the different variables self.xgrid = xgrid self.polynomial_degree = polynomial_degree self.log = xgrid.log logger.info( "Interpolation: number of points = %d, polynomial degree = %d, logarithmic = %s", len(xgrid), polynomial_degree, xgrid.log, ) # Create blocks list_of_blocks = [] po2 = polynomial_degree // 2 # if degree is even, we can not split the block symmetric, e.g. deg=2 -> |-|-| # so, in case of doubt use the block, which lays higher, i.e. # we're not allowed to go so deep -> make po2 smaller if polynomial_degree % 2 == 0: po2 -= 1 # iterate areas: there is 1 less then number of points for i in range(len(xgrid) - 1): kmin = max(0, i - po2) kmax = kmin + polynomial_degree if kmax >= len(xgrid): kmax = len(xgrid) - 1 kmin = kmax - polynomial_degree b = (kmin, kmax) list_of_blocks.append(b) # Generate the basis functions basis_functions = [] for i in range(len(xgrid)): new_basis = BasisFunction( xgrid.grid, i, list_of_blocks, mode_log=xgrid.log, mode_N=mode_N ) basis_functions.append(new_basis) self.basis = basis_functions def __eq__(self, other): """Check equality.""" checks = [ self.log == other.log, self.polynomial_degree == other.polynomial_degree, self.xgrid == other.xgrid, ] return all(checks) def __iter__(self): # return iter(self.basis) yield from self.basis def __getitem__(self, item): return self.basis[item]
[docs] def get_interpolation(self, targetgrid: Union[npt.NDArray, Sequence]): r"""Compute interpolation matrix between `targetgrid` and `xgrid`. .. math:: f(targetgrid) = R \cdot f(xgrid) Parameters ---------- targetgrid : array grid to interpolate to Returns ------- R : array interpolation matrix $R_{ij}$, where $i$ is the index over `targetgrid`, and $j$ is the index on the internal basis (the one stored in the :class:`InterpolatorDispatcher` instance) """ # trivial? if len(targetgrid) == len(self.xgrid) and np.allclose( targetgrid, self.xgrid.raw ): return np.eye(len(self.xgrid)) # compute map out = [] for x in targetgrid: row = [] for b in self.basis: row.append(b.evaluate_x(x)) out.append(row) return np.array(out)
[docs] def to_dict(self): """Return the configuration for the underlying xgrid. An instance can be constructed again with just the information returned by this method. Note ---- The output dictionary contains a numpy array. Returns ------- ret : dict full grid configuration """ ret = { "xgrid": self.xgrid.dump(), "polynomial_degree": self.polynomial_degree, "is_log": self.log, } return ret
[docs] def make_grid( n_low, n_mid, n_high=0, x_min=1e-7, x_low=0.1, x_high=0.9, x_high_max=1.0 - 1e-4 ): """Create a log-lin-log-spaced grid. 1.0 is always part of the grid and the final grid is unified, i.e. esp. points that might appear twice at the borders of the regions are unified. Parameters ---------- n_low : int points in the small-x region n_mid : int points in the medium-x region n_high : int points in the large-x region x_min : float minimum x (included) x_low : float seperation point between small and medium x_high_max : float closest point before 1 Returns ------- xgrid : numpy.ndarray generated grid """ # low if n_mid == 0: x_low = 1.0 xgrid_low = np.geomspace(x_min, x_low, n_low) # high if n_high == 0: x_high = 1.0 xgrid_high = np.array([]) else: xgrid_high = 1.0 - np.geomspace(1 - x_high, 1 - x_high_max, n_high) # mid xgrid_mid = np.linspace(x_low, x_high, n_mid) # join xgrid = np.unique(np.concatenate((xgrid_low, xgrid_mid, xgrid_high, np.array([1])))) return xgrid
[docs] def lambertgrid(n_pts, x_min=1e-7, x_max=1.0): r"""Create a smoothly spaced grid that is linear near 1 and logarithmic near 0. It is generated by the relation: .. math:: x(y) = \frac{1}{5} W_{0}(5 \exp(5-y)) where :math:`W_{0}` is the principle branch of the :func:`Lambert W function <scipy.special.lambertw>` and :math:`y` is a variable which extremes are given as function of :math:`x` by the direct relation: .. math:: y(x) = 5(1-x)-\log(x) This method is implemented in `PineAPPL`, :cite:`Carrazza_2020` eq 2.11 and relative paragraph. Parameters ---------- n_pts : int number of points x_min : float minimum x (included) x_max : float maximum x (included) Returns ------- xgrid : numpy.ndarray generated grid """ def direct_relation(x): return 5 * (1 - x) - np.log(x) def inverse_relation(y): return np.real(1 / 5 * sp.lambertw(5 * np.exp(5 - y))) y_min = direct_relation(x_min) y_max = direct_relation(x_max) return np.array([inverse_relation(y) for y in np.linspace(y_min, y_max, n_pts)])