Source code for spqr

# Marco Riggirello & Antoine Venturini
from tensorflow import expand_dims, reshape, Module
from tensorflow.python.keras.layers import Layer, Dense, Activation
from tensorflow.python.keras.activations import softmax, softplus
from tensorflow_probability.python.bijectors import RealNVP, Chain, RationalQuadraticSpline


[docs]class SplineBlock(Layer): """ `tf.keras` layers block used for learning parameters (knots) of rational quadratic splines. Inherits from :class: `tensorflow.keras.layers.Layer`. :param nunits: Number of splines. :type nunits: int :param nbins: Number of bins for each spline. Note that the total number of spline parameters is `3*nbins - 1`: `nbins` for x and y bin coordinates respectively and `nbins - 1` for slopes. :type nbins: int :param border: The border of the splines. Spline bins are defined in the interval [-border, border], outside the relation between x and y is `y=x`. :type border: float :param hidden_layers: Dimensions of each dense layer, defaults to `[512, 512]`. :type hidde_layers: list[int], optional :param min_bin_gap: Minimum distance between subsequent bins, defaults to `1e-3`. :type min_bin_gap: float, optional :param min_slope: Mimimum spline slope in each bin, defaults to `1e-3`. :type min_slope: float, optional """
[docs] def __init__(self, nunits, nbins, border, hidden_layers=[512,512], min_bin_gap=1e-3, min_slope=1e-3): """ Constructor method. """ super().__init__(name="spline_block") self._nunits = nunits self._nbins = nbins self._nslopes = nbins - 1 self._border = border self._min_bin_gap = min_bin_gap self._min_slope = min_slope self._hidden_layers = [ Dense(n,activation="relu",name=f"spqr_nn_layer_{i}") for i,n in enumerate(hidden_layers) ] self._widths_layer = Dense(self._nunits * self._nbins, name="widths_layer") self._heights_layer = Dense(self._nunits * self._nbins, name="heights_layer") self._slopes_layer = Dense(self._nunits * self._nslopes, name="slopes_layer")
[docs] def call(self, units): """ Returns the units tensor transformed by the neural network. :param units: Input tensor. :type units: tensorflow.Tensor :return: One tensor for bin x coordinates (widths), one for y coordinates (heights) and one for slopes. :rtype: tensorflow.Tensor """ if units.shape.rank == 1: units = expand_dims(units, axis=0) adjust_rank = lambda x: x[0] else: adjust_rank = lambda x: x for layer in self._hidden_layers: units = layer(units) widths = adjust_rank(self._widths_layer(units)) widths = reshape(widths, widths .shape[:-1] .concatenate((self._nunits, self._nbins))) widths = Activation(softmax)(widths) widths = widths * (2 * self._border - self._nbins * self._min_bin_gap ) - self._min_bin_gap heights = adjust_rank(self._heights_layer(units)) heights = reshape(heights, heights .shape[:-1] .concatenate((self._nunits, self._nbins))) heights = Activation(softmax)(heights) heights = heights * (2 * self._border - self._nbins * self._min_bin_gap ) - self._min_bin_gap slopes = adjust_rank(self._slopes_layer(units)) slopes = reshape(slopes, slopes .shape[:-1] .concatenate((self._nunits, self._nslopes))) slopes = Activation(softplus)(slopes) slopes = slopes + self._min_slope return widths, heights, slopes
[docs]class SplineInitializer(Module): """ Creates a rational quadratic spline with trainable parameters. :param nbins: Number of spline bins, defaults to 128. :type nbins: int, optional :param border: Spline border, defaults to 4. :type border: float, optional :param hidden_layers: Dimensions of each dense layer, defaults to `[512, 512]`. :type hidde_layers: list[int], optional :param min_bin_gap: Minimum distance between subsequent bins, defaults to `1e-3`. :type min_bin_gap: float, optional :param min_slope: Mimimum spline slope in each bin, defaults to `1e-3`. :type min_slope: float, optional .. note:: For more informations about rational quadratic spline see `the original article <https://arxiv.org/abs/1906.04032>`_ by Durkan et al. """
[docs] def __init__(self, nbins=128, border=4, hidden_layers=[512,512], min_bin_gap=1e-3, min_slope=1e-3): """ Constructor method. """ super().__init__() self._nbins = nbins self._border = border self._min_bin_gap = min_bin_gap self._min_slope = min_slope self._hidden_layers = hidden_layers self._built = False
[docs] def __call__(self, x, nunits): """ Returns a rational quadratic spline with learnable parameters. :param x: The spline input. :type x: tensorflow.Tensor :param nunits: Number of splines. :type nunits: int :return: Rational quadratic spline with learnable parameters. :rtype: tensorflow_probability.bijectors.RationalQuadraticSpline """ if not self._built: self._nn = SplineBlock(nunits, self._nbins, self._border, hidden_layers=self._hidden_layers, min_bin_gap=self._min_bin_gap, min_slope=self._min_slope) self._built = True widths, heights, slopes = self._nn(x) return RationalQuadraticSpline(widths, heights, slopes, range_min= -self._border)
[docs]class NeuralSplineFlow(Chain): """ Neural Spline Flow bijector. This is a coupling layer type bijector with rational quadratic spline acting as transformer. The coupling layer architecture can be defined as a list of masks (or a number of splits) that decides which variable is conditioned and which is the conditioner in each layer (or better, in each transformation step). Suppose we want to transform 3 variables using the mask [1,-1], where the negative number indicates the second part of the split as conditioner. Two coupling layers will be defined: the first one maps the feature :math:`x_1` to itself and acts on features :math:`x_0` and :math:`x_2`. A second coupling layer acts on these transformed variables :math:`x_0^\prime, x_1, x_2^\prime` masking the feature :math:`x_{-1}`, i.e. :math:`x_0` and trnsforming the others. The user may want to specify only in how many chunks he wants to split the number of variables. For this case the `splits` paramenter is defined: a corresponding number of coupling layers is created, where the *j-th* layer has a fration *j / nsplit* of input features masked. .. note:: See `the RealNVP documentation <https://www.tensorflow.org/probability/api_docs/python/tfp/bijectors/RealNVP>`_ for more infos. :param splits: number of splits for the variables. :type splits: int :param masks: list of masks for variables. :type masks: list[int] :param spline_params: dictionary of parameters for SplineInitializer. :type spline_params: dict :raises: ValueError """
[docs] def __init__(self, splits=None, masks=None, spline_params = {} ): """ Default constructor """ self._spline_params = spline_params self._splits = splits self._masks = masks if self._splits is not None and self._masks is None: if self._splits < 2: raise ValueError("splits must be greater than or equal to 2 ", "(You must split your feature vec in at least two parts).") realnvp_args = [ dict(fraction_masked=i/self._splits, bijector_fn=SplineInitializer(**self._spline_params)) for i in range(1-self._splits, self._splits) if i != 0 ] elif self._masks is not None and self._splits is None: realnvp_args = [ dict(num_masked=i, bijector_fn=SplineInitializer(**self._spline_params)) for i in self._masks ] else: raise ValueError("You must specify `splits` OR `masks`, not both.") self._coupling_layers = [ RealNVP(**splines, name=f"coupling_layer_{i}") for i, splines in enumerate(realnvp_args) ] super().__init__(bijectors=self._coupling_layers, name="nsf")