Source code for toppra.solverwrapper.hot_qpoases_solverwrapper

from .solverwrapper import SolverWrapper
import numpy as np
from ..constraint import ConstraintType
from ..constants import QPOASES_INFTY, TINY, SMALL
from ..exceptions import SolverNotFound


try:
    from qpoases import (
        PyOptions as Options,
        PyPrintLevel as PrintLevel,
        PyReturnValue as ReturnValue,
        PySQProblem as SQProblem,
    )

    qpoases_FOUND = True
except ImportError:
    qpoases_FOUND = False
import logging

logger = logging.getLogger(__name__)


[docs]class hotqpOASESSolverWrapper(SolverWrapper): """`qpOASES` solver wrapper with hot-start. This wrapper takes advantage of the warm-start capability of the qpOASES quadratic programming solver. It uses two different qp solvers. One to solve for maximized controllable sets and one to solve for minimized controllable sets. The wrapper selects which solver to use by looking at the optimization direction. This solver wrapper also scale data before invoking `qpOASES`. If the logger "toppra" is set to debug level, qpoases solvers are initialized with PrintLevel.HIGH. Otherwise, these are initialized with PrintLevel.NONE Currently only support Canonical Linear Constraints. Parameters ---------- constraint_list: :class:`.Constraint` [] The constraints the robot is subjected to. path: :class:`.Interpolator` The geometric path. path_discretization: array The discretized path positions. disable_check: bool, optional Disable check for solution validity. Improve speed by about 20% but entails the possibility that failure is not reported correctly. scaling_solverwrapper: bool, optional If is True, try to scale the data of each optimization before running. Important: Currently scaling is always done regardless of the value of this variable. To be fixed. """ def __init__( self, constraint_list, path, path_discretization, disable_check=False, scaling_solverwrapper=True, ): if not qpoases_FOUND: SolverNotFound("toppra is unable to find any installation of qpoases!") super(hotqpOASESSolverWrapper, self).__init__( constraint_list, path, path_discretization ) self._disable_check = disable_check # First constraint is x + 2 D u <= xnext_max, second is xnext_min <= x + 2D u self.nC = 2 # number of Constraints. for i, constraint in enumerate(constraint_list): if constraint.get_constraint_type() != ConstraintType.CanonicalLinear: raise NotImplementedError a, b, c, F, v, _, _ = self.params[i] if a is not None: if constraint.identical: self.nC += F.shape[0] else: self.nC += F.shape[1] # qpOASES coefficient arrays # l <= var <= h # lA <= A var <= hA self._A = np.zeros((self.nC, self.nV)) self._lA = -np.ones(self.nC) * QPOASES_INFTY self._hA = np.ones(self.nC) * QPOASES_INFTY self._l = -np.ones(2) * QPOASES_INFTY self._h = np.ones(2) * QPOASES_INFTY
[docs] def setup_solver(self): """Initiate two internal solvers for warm-start. """ option = Options() if logger.getEffectiveLevel() == logging.DEBUG: # option.printLevel = PrintLevel.HIGH option.printLevel = PrintLevel.NONE else: option.printLevel = PrintLevel.NONE self.solver_minimizing = SQProblem(self.nV, self.nC) self.solver_minimizing.setOptions(option) self.solver_maximizing = SQProblem(self.nV, self.nC) self.solver_maximizing.setOptions(option) self.solver_minimizing_recent_index = -2 self.solver_maximizing_recent_index = -2
def close_solver(self): self.solver_minimizing = None self.solver_maximizing = None
[docs] def solve_stagewise_optim(self, i, H, g, x_min, x_max, x_next_min, x_next_max): assert i <= self.N and 0 <= i # solve the scaled optimization problem # min 0.5 y^T scale H scale y + g^T scale y # s.t lA <= A scale y <= hA # l <= scale y <= h self._l[:] = -QPOASES_INFTY self._h[:] = QPOASES_INFTY if x_min is not None: self._l[1] = max(self._l[1], x_min) if x_max is not None: self._h[1] = min(self._h[1], x_max) if i < self.N: delta = self.get_deltas()[i] if x_next_min is not None: self._A[0] = [-2 * delta, -1] self._hA[0] = -x_next_min else: self._A[0] = [0, 0] self._hA[0] = QPOASES_INFTY self._lA[0] = -QPOASES_INFTY if x_next_max is not None: self._A[1] = [2 * delta, 1] self._hA[1] = x_next_max else: self._A[1] = [0, 0] self._hA[1] = QPOASES_INFTY self._lA[1] = -QPOASES_INFTY cur_index = 2 for j in range(len(self.constraints)): a, b, c, F, v, ubound, xbound = self.params[j] if a is not None: if self.constraints[j].identical: nC_ = F.shape[0] self._A[cur_index : cur_index + nC_, 0] = F.dot(a[i]) self._A[cur_index : cur_index + nC_, 1] = F.dot(b[i]) self._hA[cur_index : cur_index + nC_] = v - F.dot(c[i]) self._lA[cur_index : cur_index + nC_] = -QPOASES_INFTY else: nC_ = F[i].shape[0] self._A[cur_index : cur_index + nC_, 0] = F[i].dot(a[i]) self._A[cur_index : cur_index + nC_, 1] = F[i].dot(b[i]) self._hA[cur_index : cur_index + nC_] = v[i] - F[i].dot(c[i]) self._lA[cur_index : cur_index + nC_] = -QPOASES_INFTY cur_index = cur_index + nC_ if ubound is not None: self._l[0] = max(self._l[0], ubound[i, 0]) self._h[0] = min(self._h[0], ubound[i, 1]) if xbound is not None: self._l[1] = max(self._l[1], xbound[i, 0]) self._h[1] = min(self._h[1], xbound[i, 1]) # if x_min == x_max, do not solve the 2D linear program, instead, do a line search if abs(x_min - x_max) < TINY and H is None and self.get_no_vars() == 2: logger.debug("x_min ({:f}) equals x_max ({:f})".format(x_min, x_max)) u_min = -QPOASES_INFTY u_max = QPOASES_INFTY for i in range(self._A.shape[0]): if self._A[i, 0] > 0: u_max = min( u_max, (self._hA[i] - self._A[i, 1] * x_min) / self._A[i, 0] ) elif self._A[i, 0] < 0: u_min = max( u_min, (self._hA[i] - self._A[i, 1] * x_min) / self._A[i, 0] ) if (u_min - u_max) / abs(u_max) > SMALL: # problem infeasible logger.debug( "u_min > u_max by {:f}. Might not be critical. " "Returning failure.".format(u_min - u_max) ) return np.array([np.nan, np.nan]) if g[0] < 0: return np.array([u_max, x_min + 2 * u_max * delta]) else: return np.array([u_min, x_min + 2 * u_min * delta]) if H is None: H = ( np.ones((self.get_no_vars(), self.get_no_vars())) * 1e-18 ) # regularization, very important ratio_col1 = 1 / ( np.sum(np.abs(self._A[2:, 0])) + 1e-5 ) # the maximum possible value for both ratios is 100000 ratio_col2 = 1 / (np.sum(np.abs(self._A[2:, 1])) + 1e-5) variable_scales = np.array([ratio_col1, ratio_col2]) # variable_scales = np.array([5000.0, 2000.0]) variable_scales_mat = np.diag(variable_scales) if logger.isEnabledFor(logging.DEBUG): logger.debug( "min ratio col 1 {:f}, col 2 {:f}".format(ratio_col1, ratio_col2) ) # ratio scaling self._A = self._A.dot(variable_scales_mat) self._l = self._l / variable_scales self._h = self._h / variable_scales self._g = g * variable_scales self._H = variable_scales_mat.dot(H).dot(variable_scales_mat) # rows scaling row_magnitude = np.sum(np.abs(self._A), axis=1) row_scaling_mat = np.diag((row_magnitude + 1) ** (-1)) self._A = np.dot(row_scaling_mat, self._A) self._lA = np.dot(row_scaling_mat, self._lA) self._hA = np.dot(row_scaling_mat, self._hA) return_value, var = self._solve_optimization(i) if return_value == ReturnValue.SUCCESSFUL_RETURN: if logger.isEnabledFor(logging.DEBUG): logger.debug("optimal value: {:}".format(var)) if self._disable_check: return var * variable_scales # Check for constraint feasibility success = ( np.all(self._l <= var + TINY) and np.all(var <= self._h + TINY) and np.all(np.dot(self._A, var) <= self._hA + TINY) and np.all(np.dot(self._A, var) >= self._lA - TINY) ) if not success: # import ipdb; ipdb.set_trace() logger.fatal( "Hotstart fails but qpOASES does not report correctly. \n " "var: {:}, lower_bound: {:}, higher_bound{:}".format( var, self._l, self._h ) ) # TODO: Investigate why this happen and fix the # relevant code (in qpOASES wrapper) else: return var * variable_scales else: logger.debug("Optimization fails. qpOASES error code: %d.", return_value) if ( np.all(0 <= self._hA) and np.all(0 >= self._lA) and np.all(0 <= self._h) and np.all(0 >= self._l) ): logger.fatal( "(0, 0) satisfies all constraints => error due to numerical errors.", self._A, self._lA, self._hA, self._l, self._h, ) else: logger.debug("(0, 0) does not satisfy all constraints.") return_value = np.empty(self.get_no_vars()) return_value[:] = np.nan return return_value
def _solve_optimization(self, i): var = np.zeros(self.nV) if self._g[1] > 0: # Choose solver_minimizing if abs(self.solver_minimizing_recent_index - i) > 1: logger.debug("solver_minimizing [init]") return_value = self.solver_minimizing.init( self._H, self._g, self._A, self._l, self._h, self._lA, self._hA, np.array([1000]), ) else: if logger.isEnabledFor(logging.DEBUG): logger.debug("solver_minimizing [hotstart]") return_value = self.solver_minimizing.hotstart( self._H, self._g, self._A, self._l, self._h, self._lA, self._hA, np.array([1000]), ) self.solver_minimizing_recent_index = i self.solver_minimizing.getPrimalSolution(var) else: # Choose solver_maximizing if abs(self.solver_maximizing_recent_index - i) > 1: if logger.isEnabledFor(logging.DEBUG): logger.debug("solver_maximizing [init]") return_value = self.solver_maximizing.init( self._H, self._g, self._A, self._l, self._h, self._lA, self._hA, np.array([1000]), ) else: if logger.isEnabledFor(logging.DEBUG): logger.debug("solver_maximizing [hotstart]") return_value = self.solver_maximizing.hotstart( self._H, self._g, self._A, self._l, self._h, self._lA, self._hA, np.array([1000]), ) self.solver_maximizing_recent_index = i self.solver_maximizing.getPrimalSolution(var) return return_value, var