base.py

ODE integration of Hamilton’s equations.


Requires Python packages/modules:

TODO: broken version - needs major overhaul

class gme.knickpoints.base.CompositeSolution(gmeq: gme.core.equations.Equations, parameters: Dict, **kwargs)

Combine ray integrations.

From (1) initial profile, (2) initial corner, and (3) velocity boundary to generate a ‘complete’ ray integration solution from a complex topographic boundary.

__init__(gmeq: gme.core.equations.Equations, parameters: Dict, **kwargs)None

Initialize.

Parameters
  • gmeq – GME model equations class instance defined in gme.core.equations

  • parameters – dictionary of model parameter values to be used for equation substitutions

  • **kwargs – remaining keyword arguments (see base class for details)

create_solutions(t_end: float = 0.04, t_slip_end: float = 0.08, do_solns: Dict = {'ic': True, 'ip': True, 'vb': True}, n_rays: Dict = {'ic': 31, 'ip': 101, 'vb': 101}, n_t: Dict = {'ic': 1001, 'ip': 1001, 'vb': 1001})None

Create a set of solutions obtained by different methods.

merge_rays()None

Merge rays from different solution methods.

solve()None

Solve.

class gme.knickpoints.base.InitialCornerSolution(gmeq: gme.core.equations.Equations, parameters: Dict, **kwargs)

Integrate Hamilton’s equations (ODEs) from a ‘corner’ point.

Provides a set of ray integrations to span solutions along an initial profile and solutions from a slip velocity boundary.

__init__(gmeq: gme.core.equations.Equations, parameters: Dict, **kwargs)None

Initialize.

Parameters
  • gmeq – GME model equations class instance defined in gme.core.equations

  • parameters – dictionary of model parameter values to be used for equation substitutions

  • **kwargs – remaining keyword arguments (see base class for details)

initial_conditions(beta0_)Tuple[float, float, float, float]

Define initial conditions.

solve(report_pc_step=1, verbose=True)

Solve Hamilton’s equations.

class gme.knickpoints.base.InitialProfileSolution(gmeq: gme.core.equations.Equations, parameters: Dict, **kwargs)

Integrate Hamilton’s equations from an initial topographic profile.

Parameters
  • gmeq – GME model equations class instance defined in gme.core.equations

  • parameters – dictionary of model parameter values to be used for equation substitutions

  • **kwargs – remaining keyword arguments (see base class for details)

__init__(gmeq: gme.core.equations.Equations, parameters: Dict, **kwargs)None

Initialize.

initial_conditions(x_: float)Tuple[float, float, float, float]

Define initial conditions.

solve(report_pc_step=1)

Solve.

Code

"""
ODE integration of Hamilton's equations.

---------------------------------------------------------------------

Requires Python packages/modules:
  -  :mod:`NumPy <numpy>`
  -  :mod:`SymPy <sympy>`
  -  `GME`_

.. _GMPLib: https://github.com/geomorphysics/GMPLib
.. _GME: https://github.com/geomorphysics/GME
.. _Matrix: https://docs.sympy.org/latest/modules/matrices/immutablematrices.html

---------------------------------------------------------------------

TODO: broken version - needs major overhaul

"""
# Library
import warnings

# import logging
from typing import Tuple, Callable, Dict, Optional, Union, Any

# NumPy
import numpy as np

# SymPy
from sympy import Eq, atan2, Matrix

# GME
from gme.ode.base import BaseSolution
from gme.ode.velocity_boundary import VelocityBoundarySolution
from gme.core.symbols import x, rx, Lc, beta, px, pz

from gme.ode.base import rpt_tuple
from gme.core.equations import Equations
from gme.ode.utils import report_progress

warnings.filterwarnings("ignore")

__all__ = [
    "InitialProfileSolution",
    "InitialCornerSolution",
    "CompositeSolution",
]


# class DummySolution(BaseSolution):
#     solve: Optional[Callable]
#     t_ensemble_max: float
#     model_dXdt_lambda: Optional[Callable]
#     rpt_arrays: Optional[Dict]
#
#     def __init__(self, gmeq, parameters, **kwargs) -> None:
#         """
#         Constructor method.
#         """
#         super().__init__(gmeq, parameters, **kwargs)
#         self.solve = None
#         self.t_ensemble_max = 0
#         self.model_dXdt_lambda = None
#         self.rpt_arrays = None


class InitialProfileSolution(BaseSolution):
    """
    Integrate Hamilton's equations from an initial topographic profile.

    Args:
        gmeq:
            GME model equations class instance defined in
            :mod:`gme.core.equations`
        parameters:
            dictionary of model parameter values to be used for
            equation substitutions
        **kwargs:
            remaining keyword arguments (see base class for details)
    """

    # Prerequisites
    t_ensemble_max: float
    model_dXdt_lambda: Optional[Callable]
    rpt_arrays: Optional[Dict]

    # Definitions
    rz_initial_surface_eqn: Eq
    ic: Tuple[float, float, float, float]

    def __init__(self, gmeq: Equations, parameters: Dict, **kwargs) -> None:
        """Initialize."""
        super().__init__(gmeq, parameters, **kwargs)

        # Initial condition equations
        self.rz_initial_surface_eqn = gmeq.rz_initial_eqn.subs({rx: x})

    def initial_conditions(
        self, x_: float
    ) -> Tuple[float, float, float, float]:
        """Define initial conditions."""
        rx0_: float = x_
        rz0_ = float(
            self.rz_initial_surface_eqn.rhs.subs({x: x_}).subs(self.parameters)
        )
        px0_ = float(
            self.px_initial_surface_eqn.rhs.subs({x: x_}).subs(self.parameters)
        )
        pz0_ = float(
            self.pz_initial_surface_eqn.rhs.subs({x: x_}).subs(self.parameters)
        )
        return (rx0_, rz0_, px0_, pz0_)

    def solve(self, report_pc_step=1):
        """Solve."""
        self.prep_arrays()
        Lc_ = float(Lc.subs(self.parameters))
        # print(self.t_end)
        pc_progress = report_progress(i=0, n=self.n_rays, is_initial_step=True)
        # Points along the initial profile boundary
        for idx, x_ in enumerate(
            np.linspace(0, Lc_, self.n_rays, endpoint=True)
        ):
            # if self.verbose:
            #   print('{}: {}'.format(idx,np.round(x_,4)),end='\t')
            self.ic = self.initial_conditions(x_)
            print(f"Initial conditions {idx}: {self.ic}")
            self.model_dXdt_lambda = self.make_model()
            rpt_arrays = self.solve_Hamiltons_equations(
                t_array=self.ref_t_array.copy()
            )
            self.save(rpt_arrays, idx)
            pc_progress = report_progress(
                i=idx,
                n=self.n_rays,
                pc_step=report_pc_step,
                progress_was=pc_progress,
            )
        report_progress(
            i=idx,
            n=self.n_rays,
            pc_step=report_pc_step,
            progress_was=pc_progress,
        )


class InitialCornerSolution(BaseSolution):
    """
    Integrate Hamilton's equations (ODEs) from a 'corner' point.

    Provides a set of ray integrations to span solutions along an initial
    profile and solutions from a slip velocity boundary.
    """

    # Prerequisites
    t_ensemble_max: float
    model_dXdt_lambda: Optional[Callable]
    rpt_arrays: Optional[Dict]

    # Definitions
    px_initial_corner_eqn: Eq
    pz_initial_corner_eqn: Eq
    beta_surface_corner: Eq
    beta_velocity_corner: Eq
    rdot: Matrix
    ic: Tuple[float, float, float, float]

    def __init__(self, gmeq: Equations, parameters: Dict, **kwargs) -> None:
        """
        Initialize.

        Args:
            gmeq:
                GME model equations class instance defined in
                :mod:`gme.core.equations`
            parameters:
                dictionary of model parameter values to be used
                for equation substitutions
            **kwargs:
                remaining keyword arguments (see base class for details)
        """
        super().__init__(gmeq, parameters, **kwargs)
        self.px_initial_corner_eqn = self.gmeq.px_varphi_rx_beta_eqn.subs(
            {rx: 0}
        )
        self.pz_initial_corner_eqn = self.gmeq.pz_varphi_rx_beta_eqn.subs(
            {rx: 0}
        )
        self.beta_surface_corner = float(
            atan2(
                self.gmeq.px_initial_eqn.rhs.subs({x: 0}).subs(self.parameters),
                -self.gmeq.pz_initial_eqn.rhs.subs({x: 0}).subs(
                    self.parameters
                ),
            )
        )
        pz_velocity_corner = self.pz_velocity_boundary_eqn
        # HACK!!!  doing this to set up px0_poly_eqn
        _ = self.pxpz0_from_xiv0()
        px_velocity_corner = self.px_value(
            x_=0, pz_=pz_velocity_corner, parameters=self.parameters
        )
        self.beta_velocity_corner = float(
            atan2(px_velocity_corner, -pz_velocity_corner)
        )
        if self.choice == "geodesic":
            self.rdot = Matrix(
                [
                    self.gmeq.hamiltons_eqns[0].rhs.subs(parameters),
                    self.gmeq.hamiltons_eqns[1].rhs.subs(parameters),
                ]
            )
        else:
            self.rdot = None

    def initial_conditions(self, beta0_) -> Tuple[float, float, float, float]:
        """Define initial conditions."""
        rx0_ = 0
        rz0_ = 0
        px0_ = float(
            self.px_initial_corner_eqn.rhs.subs({beta: beta0_}).subs(
                self.parameters
            )
        )
        pz0_ = float(
            self.pz_initial_corner_eqn.rhs.subs({beta: beta0_}).subs(
                self.parameters
            )
        )
        if self.choice == "Hamilton":
            return (rx0_, rz0_, px0_, pz0_)
        else:
            return (
                rx0_,
                rz0_,
                *(
                    self.rdot.subs({rx: rx0_, px: px0_, pz: pz0_})
                    if self.rdot is not None
                    else (0, 0)
                ),
            )

    def solve(self, report_pc_step=1, verbose=True):
        """Solve Hamilton's equations."""
        print(
            "Solving Hamilton's equations"
            if self.choice == "Hamilton"
            else "Solving geodesic equations"
        )
        self.prep_arrays()
        # Surface tilt angles spanning velocity b.c. to initial surface b.c.
        pc_progress = report_progress(i=0, n=self.n_rays, is_initial_step=True)
        for idx, beta0_ in enumerate(
            np.linspace(
                self.beta_velocity_corner,
                self.beta_surface_corner,
                self.n_rays,
                endpoint=True,
            )
        ):
            if self.verbose == "very":
                print("{}: {}".format(idx, np.round(beta0_, 4)), end="\t")
            self.ic = self.initial_conditions(beta0_)
            self.model_dXdt_lambda = self.make_model(do_verbose=False)
            if self.customize_t_fn is not None:
                t_array = self.ref_t_array.copy()
                t_limit = self.customize_t_fn(
                    t_array[-1], self.ic[3] / self.ic[2]
                )
                t_array = t_array[t_array <= t_limit]
                # print(f't_end = {t_array[-1]}')
            else:
                t_array = self.ref_t_array.copy()
            rpt_arrays = self.solve_Hamiltons_equations(t_array=t_array)
            # print(list(zip(rpt_arrays['rx'])))
            self.save(rpt_arrays, idx)
            pc_progress = report_progress(
                i=idx,
                n=self.n_rays,
                pc_step=report_pc_step,
                progress_was=pc_progress,
            )
        report_progress(
            i=idx,
            n=self.n_rays,
            pc_step=report_pc_step,
            progress_was=pc_progress,
        )


class CompositeSolution(BaseSolution):
    """
    Combine ray integrations.

    From (1) initial profile,
    (2) initial corner, and (3) velocity boundary
    to generate a 'complete' ray integration solution from a
    complex topographic boundary.
    """

    # Prerequisites
    t_ensemble_max: float
    model_dXdt_lambda: Optional[Callable]
    rpt_arrays: Optional[Dict]

    # Definitions
    do_solns: bool
    t_end: float
    t_slip_end: float
    ips: Any
    ics: Any
    vbs: Any
    n_rays: int

    def __init__(self, gmeq: Equations, parameters: Dict, **kwargs) -> None:
        """
        Initialize.

        Args:
            gmeq:
                GME model equations class instance defined in
                :mod:`gme.core.equations`
            parameters:
                dictionary of model parameter values
                to be used for equation substitutions
            **kwargs:
                remaining keyword arguments (see base class for details)
        """
        super().__init__(gmeq, parameters, **kwargs)

    def create_solutions(
        self,
        t_end: float = 0.04,
        t_slip_end: float = 0.08,
        do_solns: Dict = dict(ip=True, ic=True, vb=True),
        n_rays: Dict = dict(ip=101, ic=31, vb=101),
        n_t: Dict = dict(ip=1001, ic=1001, vb=1001),
    ) -> None:
        """Create a set of solutions obtained by different methods."""
        self.do_solns: Dict = do_solns
        self.t_end: float = t_end
        self.t_slip_end: float = t_slip_end

        self.ips = (
            InitialProfileSolution(
                self.gmeq,
                parameters=self.parameters,
                method=self.method,
                do_dense=self.do_dense,
                t_end=self.t_end,
                n_rays=n_rays["ip"],
                n_t=n_t["ip"],
            )
            if do_solns["ip"]
            else None
        )

        self.ics = (
            InitialCornerSolution(
                self.gmeq,
                parameters=self.parameters,
                method=self.method,
                do_dense=self.do_dense,
                t_end=self.t_end,
                n_rays=n_rays["ic"],
                n_t=n_t["ic"],
            )
            if do_solns["ic"]
            else None
        )

        # t_slip_end = t_scale*1.5 if t_slip_end is None else t_slip_end
        self.vbs = (
            VelocityBoundarySolution(
                self.gmeq,
                parameters=self.parameters,
                method=self.method,
                do_dense=self.do_dense,
                t_end=self.t_end,
                t_slip_end=self.t_slip_end,
                n_rays=n_rays["vb"],
                n_t=n_t["vb"],
            )
            if do_solns["vb"]
            else None
        )

    def solve(self) -> None:
        """Solve."""
        soln_method: Union[
            InitialProfileSolution,
            InitialCornerSolution,
            VelocityBoundarySolution,
        ]
        if self.do_solns["ip"]:
            soln_method = self.ips
        elif self.do_solns["ic"]:
            soln_method = self.ics
        elif self.do_solns["vb"]:
            soln_method = self.vbs
        soln_method.solve()
        self.t_ensemble_max = soln_method.t_ensemble_max
        self.model_dXdt_lambda = soln_method.model_dXdt_lambda

    # def solve(self) -> None:
    #     if self.do_solns['ip']:
    #         self.ips.solve()
    #     if self.do_solns['ic']:
    #         self.ics.solve()
    #     if self.do_solns['vb']:
    #         self.vbs.solve()
    #     self.t_ensemble_max \
    #         = self.vbs.t_ensemble_max if self.do_solns['vb'] \
    #         else self.ics.t_ensemble_max if self.do_solns['ic'] \
    #         else self.ips.t_ensemble_max if self.do_solns['ip'] \
    #         else None
    #     self.model_dXdt_lambda \
    #         = self.vbs.model_dXdt_lambda if self.do_solns['vb'] \
    #         else self.ics.model_dXdt_lambda if self.do_solns['ic'] \
    #         else self.ips.model_dXdt_lambda if self.do_solns['ip'] \
    #         else None

    def merge_rays(self) -> None:
        """Merge rays from different solution methods."""
        # Combine all three solutions such that rays are all in rz order
        self.rpt_arrays = {}
        soln_method: Union[
            InitialProfileSolution,
            InitialCornerSolution,
            VelocityBoundarySolution,
        ]
        for rpt_ in rpt_tuple:
            soln_method = self.vbs
            vbs_arrays = (
                [] if soln_method is None else soln_method.rpt_arrays[rpt_][:-1]
            )
            soln_method = self.ics
            ics_arrays = (
                [] if soln_method is None else soln_method.rpt_arrays[rpt_]
            )
            soln_method = self.ips
            ips_arrays = (
                [] if soln_method is None else soln_method.rpt_arrays[rpt_]
            )
            self.rpt_arrays.update({rpt_: vbs_arrays + ics_arrays + ips_arrays})
        # Eliminate empty rays
        rx_arrays = self.rpt_arrays["rx"].copy()
        for rpt_ in rpt_tuple:
            self.rpt_arrays[rpt_] = [
                rpt_array
                for rx_array, rpt_array in zip(rx_arrays, self.rpt_arrays[rpt_])
                if len(rx_array) >= 0
            ]
        self.n_rays = len(self.rpt_arrays["t"])
        self.n_rays = len(self.rpt_arrays["t"])