Source code for toppra.interpolator

"""
Path Interpolator
-----------------------

This module implements clases to represent geometric paths and
trajectories.

SplineInterplator
^^^^^^^^^^^^^^^^^^^
.. autoclass:: toppra.SplineInterpolator
   :members: __call__, dof, path_interval, waypoints

RaveTrajectoryWrapper
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: toppra.RaveTrajectoryWrapper
   :members: __call__, dof, path_interval, waypoints

simplepath.SimplePath
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: toppra.simplepath.SimplePath
   :members: __call__, dof, path_interval

[abstract]AbstractGeometricPath
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: toppra.interpolator.AbstractGeometricPath
   :members: __call__, dof, path_interval, waypoints

[internal]Methods
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autofunction:: toppra.interpolator.propose_gridpoints


"""
from typing import List, Union
import logging
import numpy as np
from scipy.interpolate import UnivariateSpline, CubicSpline, PPoly
from toppra.utils import deprecated
from toppra.constants import FOUND_OPENRAVE

logger = logging.getLogger(__name__)

if FOUND_OPENRAVE:
    import openravepy as orpy  # pylint: disable=import-error


[docs]def propose_gridpoints( path, max_err_threshold=1e-4, max_iteration=100, max_seg_length=0.05, min_nb_points=100 ): r"""Generate gridpoints that sufficiently cover the given path. This function operates in multiple passes through the geometric path from the start to the end point. In each pass, for each segment, the maximum interpolation error is estimated using the following equation: .. math:: err_{est} = 0.5 * \mathrm{max}(\mathrm{abs}(p'' * d_{segment} ^ 2)) Here :math:`p''` is the second derivative of the path and d_segment is the length of the segment. If the estimated error :math:`err_{test}` is greater than the given threshold `max_err_threshold` then the segment is divided in two half. Intuitively, at positions with higher curvature, there must be more points in order to improve approximation quality. Theoretically toppra performs the best when the proposed gridpoint is optimally distributed. Arguments --------- path: :class:`AbstractGeometricPath` Input geometric path. max_err_threshold: float Maximum worstcase error thrshold allowable. max_iteration: int Maximum number of iterations. max_seg_length: float All segments length should be smaller than this value. Returns ---------- gridpoints_ept: np.ndarray(N,) The proposed gridpoints. """ gridpoints_ept = [path.path_interval[0], path.path_interval[1]] for iteration in range(max_iteration): add_new_points = False for idx in range(len(gridpoints_ept) - 1): gp_mid = 0.5 * (gridpoints_ept[idx] + gridpoints_ept[idx + 1]) if (gridpoints_ept[idx + 1] - gridpoints_ept[idx]) > max_seg_length: gridpoints_ept.append(gp_mid) add_new_points = True continue dist = gridpoints_ept[idx + 1] - gridpoints_ept[idx] max_err = np.max(np.abs(0.5 * path(gp_mid, 2) * dist ** 2)) if max_err > max_err_threshold: add_new_points = True gridpoints_ept.append(gp_mid) continue gridpoints_ept = sorted(gridpoints_ept) if not add_new_points: break while len(gridpoints_ept) < min_nb_points: new_pts = [] for idx in range(len(gridpoints_ept) - 1): gp_mid = 0.5 * (gridpoints_ept[idx] + gridpoints_ept[idx + 1]) new_pts.append(gp_mid) gridpoints_ept.extend(new_pts) gridpoints_ept = sorted(gridpoints_ept) if iteration == max_iteration - 1: raise ValueError("Unable to find a good gridpoint for this path.") return gridpoints_ept
[docs]class AbstractGeometricPath(object): """Abstract base class that represents geometric paths. Derive geometric paths classes should implement the below abstract methods. These methods are expected in different steps of the algorithm. """
[docs] def __call__(self, path_positions: Union[float, np.ndarray], order: int = 0) -> np.ndarray: """Evaluate the path at given positions. Parameters ---------- path_positions: float or np.ndarray Path positions to evaluate the interpolator. order: int Order of the evaluation call. - 0: position - 1: first-order derivative - 2: second-order derivative Returns ------- : The evaluated joint positions, velocity or accelerations. The shape of the result depends on the shape of the input, it is either (N, m) where N is the number of path positions and m is the number of degree-of-freedom, or (m,). """ raise NotImplementedError
@property def dof(self) -> int: """Return the degrees-of-freedom of the path.""" raise NotImplementedError @property def path_interval(self): """Return the starting and ending path positions. Returns ------- np.ndarray(2,) The starting and ending path positions. """ raise NotImplementedError @property def waypoints(self): """Tuple[ndarray, ndarray] or None: The path's waypoints if applicable. None otherwise.""" return None def eval(self, ss_sam: Union[float, np.ndarray]): """Evaluate the path values.""" return self.__call__(ss_sam, 0) def evald(self, ss_sam: Union[float, np.ndarray]): """Evaluate the path first-derivatives.""" return self.__call__(ss_sam, 1) def evaldd(self, ss_sam: Union[float, np.ndarray]): """Evaluate the path second-derivatives.""" return self.__call__(ss_sam, 2)
[docs]class RaveTrajectoryWrapper(AbstractGeometricPath): """An interpolator that wraps OpenRAVE's :class:`GenericTrajectory`. Only trajectories using quadratic interpolation or cubic interpolation are supported. The trajectory is represented as a piecewise polynomial. The polynomial could be quadratic or cubic depending the interpolation method used by the input trajectory object. """ @staticmethod def _extract_interpolation_method(spec): _interpolation = spec.GetGroupFromName("joint").interpolation if _interpolation not in ["quadratic", "cubic"]: raise ValueError( "This class only handles trajectories with quadratic or cubic interpolation" ) return _interpolation def __init__(self, traj, robot): # type: (orpy.RaveTrajectory, orpy.Robot) -> None """Initialize the Trajectory Wrapper. Parameters ---------- traj: An OpenRAVE joint trajectory. robot: An OpenRAVE robot. """ super(RaveTrajectoryWrapper, self).__init__() self.traj = traj spec = traj.GetConfigurationSpecification() self._dof = robot.GetActiveDOF() self._interpolation = self._extract_interpolation_method(spec) self._duration = traj.GetDuration() all_waypoints = traj.GetWaypoints(0, traj.GetNumWaypoints()).reshape( traj.GetNumWaypoints(), -1 ) valid_wp_indices = [0] ss_waypoints: List[float] = [0.0] for i in range(1, traj.GetNumWaypoints()): dt = spec.ExtractDeltaTime(all_waypoints[i]) if dt > 1e-5: # If delta is too small, skip it. valid_wp_indices.append(i) ss_waypoints.append(ss_waypoints[-1] + dt) self.ss_waypoints = np.array(ss_waypoints) n_waypoints = len(valid_wp_indices) def _extract_waypoints(order): return np.array( [ spec.ExtractJointValues( all_waypoints[i], robot, robot.GetActiveDOFIndices(), order ) for i in valid_wp_indices ] ) def _make_ppoly(): if n_waypoints == 1: waypoints = _extract_waypoints(0) pp_coeffs = np.zeros((1, 1, self.dof)) for idof in range(self.dof): pp_coeffs[0, 0, idof] = waypoints[0, idof] return PPoly(pp_coeffs, [0, 1]) if self._interpolation == "quadratic": waypoints = _extract_waypoints(0) waypoints_d = _extract_waypoints(1) waypoints_dd = [] for i in range(n_waypoints - 1): qdd = (waypoints_d[i + 1] - waypoints_d[i]) / ( self.ss_waypoints[i + 1] - self.ss_waypoints[i] ) waypoints_dd.append(qdd) waypoints_dd = np.array(waypoints_dd) # Fill the coefficient matrix for scipy.PPoly class pp_coeffs = np.zeros((3, n_waypoints - 1, self.dof)) for idof in range(self.dof): for iseg in range(n_waypoints - 1): pp_coeffs[:, iseg, idof] = [ waypoints_dd[iseg, idof] / 2, waypoints_d[iseg, idof], waypoints[iseg, idof], ] return PPoly(pp_coeffs, self.ss_waypoints) if self._interpolation == "cubic": waypoints = _extract_waypoints(0) waypoints_d = _extract_waypoints(1) waypoints_dd = _extract_waypoints(2) waypoints_ddd = [] for i in range(n_waypoints - 1): qddd = (waypoints_dd[i + 1] - waypoints_dd[i]) / ( self.ss_waypoints[i + 1] - self.ss_waypoints[i] ) waypoints_ddd.append(qddd) waypoints_ddd = np.array(waypoints_ddd) # Fill the coefficient matrix for scipy.PPoly class pp_coeffs = np.zeros((4, n_waypoints - 1, self.dof)) for idof in range(self.dof): for iseg in range(n_waypoints - 1): pp_coeffs[:, iseg, idof] = [ waypoints_ddd[iseg, idof] / 6, waypoints_dd[iseg, idof] / 2, waypoints_d[iseg, idof], waypoints[iseg, idof], ] return PPoly(pp_coeffs, self.ss_waypoints) raise ValueError("An error has occured. Unable to form PPoly.") self.ppoly = _make_ppoly() @deprecated def get_duration(self): """Return the path's duration.""" return self.duration @deprecated def get_dof(self): # type: () -> int """Return the path's dof.""" return self.dof @property def duration(self): """Return the duration of the path.""" return self._duration @property def path_interval(self): """Return the start and end points.""" return np.array([0, self._duration]) @property def dof(self): return self._dof
[docs] def __call__(self, ss_sam, order=0): if order == 0: return self.eval(ss_sam) if order == 1: return self.evald(ss_sam) if order == 2: return self.evaldd(ss_sam) raise ValueError("Order must be 0, 1 or 2.")
def eval(self, ss_sam): """Evalute path postition.""" return self.ppoly(ss_sam) def evald(self, ss_sam): """Evalute path velocity.""" return self.ppoly.derivative()(ss_sam) def evaldd(self, ss_sam): """Evalute path acceleration.""" return self.ppoly.derivative(2)(ss_sam)
[docs]class SplineInterpolator(AbstractGeometricPath): """Interpolate the given waypoints by cubic spline. This interpolator is implemented as a simple wrapper over scipy's CubicSpline class. Parameters ---------- ss_waypoints: np.ndarray(m,) Path positions of the waypoints. waypoints: np.ndarray(m, d) Waypoints. bc_type: optional Boundary conditions of the spline. Can be 'not-a-knot', 'clamped', 'natural' or 'periodic'. - 'not-a-knot': The most default option, return the most naturally looking spline. - 'clamped': First-order derivatives of the spline at the two end are clamped at zero. See scipy.CubicSpline documentation for more details. """ def __init__(self, ss_waypoints, waypoints, bc_type="not-a-knot"): super(SplineInterpolator, self).__init__() self.ss_waypoints = np.array(ss_waypoints) # type: np.ndarray self._q_waypoints = np.array(waypoints) # type: np.ndarray assert self.ss_waypoints.shape[0] == self._q_waypoints.shape[0] if len(ss_waypoints) == 1: def _1dof_cspl(s): try: ret = np.zeros((len(s), self.dof)) ret[:, :] = self._q_waypoints[0] except TypeError: ret = self._q_waypoints[0] return ret def _1dof_cspld(s): try: ret = np.zeros((len(s), self.dof)) except TypeError: ret = np.zeros(self.dof) return ret self.cspl = _1dof_cspl self.cspld = _1dof_cspld self.cspldd = _1dof_cspld else: self.cspl = CubicSpline(ss_waypoints, waypoints, bc_type=bc_type) self.cspld = self.cspl.derivative() self.cspldd = self.cspld.derivative()
[docs] def __call__(self, path_positions, order=0): if order == 0: return self.cspl(path_positions) if order == 1: return self.cspld(path_positions) if order == 2: return self.cspldd(path_positions) raise ValueError("Invalid order %s" % order)
@property def waypoints(self): """Tuple[np.ndarray, np.ndarray]: Return the waypoints. The first element is the positions, the second element is the array of waypoints. """ return self.ss_waypoints, self._q_waypoints @deprecated def get_duration(self): """Return the path's duration.""" return self.duration @property def duration(self): """Return the duration of the path.""" return self.ss_waypoints[-1] - self.ss_waypoints[0] @property def path_interval(self): """Return the start and end points.""" return np.array([self.ss_waypoints[0], self.ss_waypoints[-1]]) @deprecated def get_path_interval(self): """Return the path interval.""" return self.path_interval @property def dof(self): if np.isscalar(self._q_waypoints[0]): return 1 return self._q_waypoints[0].shape[0] def compute_rave_trajectory(self, robot): """Compute an OpenRAVE trajectory equivalent to this trajectory. Parameters ---------- robot: Openrave robot. Returns ------- trajectory: Equivalent openrave trajectory. """ traj = orpy.RaveCreateTrajectory(robot.GetEnv(), "") spec = robot.GetActiveConfigurationSpecification("cubic") spec.AddDerivativeGroups(1, False) spec.AddDerivativeGroups(2, True) traj.Init(spec) deltas = [0] for i in range(len(self.ss_waypoints) - 1): deltas.append(self.ss_waypoints[i + 1] - self.ss_waypoints[i]) if len(self.ss_waypoints) == 1: q = self(0) qd = self(0, 1) qdd = self(0, 2) traj.Insert(traj.GetNumWaypoints(), list(q) + list(qd) + list(qdd) + [0]) else: qs = self(self.ss_waypoints) qds = self(self.ss_waypoints, 1) qdds = self(self.ss_waypoints, 2) for (q, qd, qdd, dt) in zip(qs, qds, qdds, deltas): traj.Insert( traj.GetNumWaypoints(), q.tolist() + qd.tolist() + qdd.tolist() + [dt], ) return traj
class UnivariateSplineInterpolator(AbstractGeometricPath): """ Smooth given wayspoints by a cubic spline. This is a simple wrapper over `scipy.UnivariateSplineInterpolator` class. Parameters ---------- ss_waypoints: ndarray Path positions of the waypoints. waypoints: ndarray The waypoints. """ def __init__(self, ss_waypoints, waypoints): super(UnivariateSplineInterpolator, self).__init__() assert ss_waypoints[0] == 0, "First index must equals zero." self.ss_waypoints = np.array(ss_waypoints) self._q_waypoints = np.array(waypoints) if np.isscalar(waypoints[0]): self._dof = 1 else: self._dof = waypoints[0].shape[0] assert self.ss_waypoints.shape[0] == self._q_waypoints.shape[0] self.uspl = [] for i in range(self.dof): self.uspl.append( UnivariateSpline(self.ss_waypoints, self._q_waypoints[:, i]) ) self.uspld = [spl.derivative() for spl in self.uspl] self.uspldd = [spl.derivative() for spl in self.uspld] @property def dof(self): return self._dof @property def path_interval(self): """Return the path interval.""" return [self.ss_waypoints[0], self.ss_waypoints[-1]] def __call__(self, ss_sam, order=0): data = [] if order == 0: for spl in self.uspl: data.append(spl(ss_sam)) elif order == 1: for spl in self.uspld: data.append(spl(ss_sam)) elif order == 2: for spl in self.uspldd: data.append(spl(ss_sam)) return np.array(data).T def eval(self, ss_sam): """Return the path position.""" data = [] for spl in self.uspl: data.append(spl(ss_sam)) return np.array(data).T def evald(self, ss_sam): """Return the path velocity.""" data = [] for spl in self.uspld: data.append(spl(ss_sam)) return np.array(data).T def evaldd(self, ss_sam): """Return the path acceleration.""" data = [] for spl in self.uspldd: data.append(spl(ss_sam)) return np.array(data).T class PolynomialPath(AbstractGeometricPath): """ A class representing polynominal paths. If coeff is a 1d array, the polynomial's equation is given by .. math:: coeff[0] + coeff[1] s + coeff[2] s^2 + ... If coeff is a 2d array, the i-th joint position is the polynomial .. math:: coeff[i, 0] + coeff[i, 1] s + coeff[i, 2] s^2 + ... """ def __init__(self, coeff, s_start=0.0, s_end=1.0): # type: (np.ndarray, float, float) -> None """Initialize the polynomial path. Parameters ---------- coeff Coefficients of the polynomials. s_start Starting path position. s_end Ending path position. """ super(PolynomialPath, self).__init__() self.coeff = np.array(coeff) self.s_end = s_end self.s_start = s_start if np.isscalar(self.coeff[0]): self.poly = [np.polynomial.Polynomial(self.coeff)] self.coeff = self.coeff.reshape(1, -1) else: self.poly = [ np.polynomial.Polynomial(self.coeff[i]) for i in range(self.dof) ] self.polyd = [poly.deriv() for poly in self.poly] self.polydd = [poly.deriv() for poly in self.polyd] def __call__(self, path_positions, order=0): if order == 0: return self.eval(path_positions) if order == 1: return self.evald(path_positions) if order == 2: return self.evaldd(path_positions) raise ValueError("Invalid order %s" % order) @property def dof(self): return self.coeff.shape[0] @property def duration(self): """Return the duration of the path.""" return self.s_end - self.s_start @property def path_interval(self): return np.array([self.s_start, self.s_end]) @deprecated def get_path_interval(self): """Return the path interval.""" return self.path_interval @deprecated def get_duration(self): """Return the path duration.""" return self.duration @deprecated def get_dof(self): """Return the path's dof.""" return self.dof def eval(self, ss_sam): """Return the path position.""" res = [poly(np.array(ss_sam)) for poly in self.poly] if self.dof == 1: return np.array(res).flatten() return np.array(res).T def evald(self, ss_sam): """Return the path velocity.""" res = [poly(np.array(ss_sam)) for poly in self.polyd] if self.dof == 1: return np.array(res).flatten() return np.array(res).T def evaldd(self, ss_sam): """Return the path acceleration.""" res = [poly(np.array(ss_sam)) for poly in self.polydd] if self.dof == 1: return np.array(res).flatten() return np.array(res).T