"""
Decline Curve Models
Copyright © 2020 David S. Fulford
Author
------
David S. Fulford
Derrick W. Turk
Notes
-----
Created on August 5, 2019
"""
import sys
from math import exp, expm1, log, log1p, ceil as ceiling, floor
import warnings
import dataclasses as dc
from dataclasses import dataclass, field
import numpy as np
from scipy.special import expi as ei, gammainc # type: ignore
from scipy.integrate import fixed_quad # type: ignore
from abc import ABC, abstractmethod
from typing import (TypeVar, Type, List, Dict, Tuple, Any,
Sequence, Iterable, Optional, Callable, ClassVar, Union)
from numpy.typing import NDArray
from typing import cast
from .base import (ParamDesc, DeclineCurve, PrimaryPhase, SecondaryPhase,
DAYS_PER_MONTH, DAYS_PER_YEAR, LOG_EPSILON, MIN_EPSILON)
NDFloat = NDArray[np.float64]
@dataclass
class NullPrimaryPhase(PrimaryPhase):
"""
A null `PrimaryPhase` class that always returns zeroes.
Parameters
----------
None
"""
def _set_defaults(self) -> None:
# Do not associate with the null secondary phase
pass
def _qfn(self, t: NDFloat) -> NDFloat:
return np.zeros_like(t, dtype=np.float64)
def _Nfn(self, t: NDFloat, **kwargs: Any) -> NDFloat:
return np.zeros_like(t, dtype=np.float64)
def _Dfn(self, t: NDFloat) -> NDFloat:
return np.zeros_like(t, dtype=np.float64)
def _Dfn2(self, t: NDFloat) -> NDFloat:
return np.zeros_like(t, dtype=np.float64)
def _betafn(self, t: NDFloat) -> NDFloat:
return np.zeros_like(t, dtype=np.float64)
def _bfn(self, t: NDFloat) -> NDFloat:
return np.zeros_like(t, dtype=np.float64)
@classmethod
def get_param_descs(cls) -> List[ParamDesc]:
return []
class MultisegmentHyperbolic(PrimaryPhase):
"""
A base class for Hyperbolic Models that generalizes for any representation of
hyperbolic "Arps'-type" models. Each child class must implement the `_segments`
function which generates the initial parameters of an arbitary number of
hyperbolic segments.
"""
T_IDX: ClassVar[int] = 0
Q_IDX: ClassVar[int] = 1
D_IDX: ClassVar[int] = 2
B_IDX: ClassVar[int] = 3
N_IDX: ClassVar[int] = 4
B_EPSILON: ClassVar[float] = 1e-10
segment_params: NDFloat
@abstractmethod
def _segments(self) -> NDFloat:
"""
Precache the initial conditions of each hyperbolic segment. Should assign a list of params
for the start condition of each segment like:
self.params = params = np.array([
[t_1, q_1, D_1, b_1, N_1],
[t_2, q_2, D_2, b_2, N_2],
[..., ..., ..., ..., ...],
[t_m, q_n, D_n, b_n, N_m],
], dtype=np.float64)
"""
raise NotImplementedError
def _validate(self) -> None:
# this is a little naughty: bypass the "frozen" protection, just this once...
# naturally, this should only be called during the __post_init__ process
object.__setattr__(self, 'segment_params', self._segments())
@staticmethod
def _qcheck(t0: float, q: float, D: float, b: float, N: float,
t: Union[float, NDFloat]) -> NDFloat:
"""
Compute the proper Arps form of q
"""
dt = DeclineCurve._validate_ndarray(t - t0)
if D < MIN_EPSILON:
return np.full_like(t, q, dtype=np.float64)
# Handle overflow for these function
# q * np.exp(-D * dt)
# q * np.log(1.0 + D * b * dt) ** (1.0 / b)
if b <= MultisegmentHyperbolic.B_EPSILON:
D_dt = D * dt
else:
D_dt = 1.0 / b * np.log(1.0 + D * b * dt)
np.putmask(D_dt, mask=D_dt > LOG_EPSILON, values=np.inf) # type: ignore
np.putmask(D_dt, mask=D_dt < -LOG_EPSILON, values=-np.inf) # type: ignore
with np.errstate(over='ignore', under='ignore', invalid='ignore'):
return q * np.exp(-D_dt)
@staticmethod
def _Ncheck(t0: float, q: float, D: float, b: float, N: float,
t: Union[float, NDFloat]) -> NDFloat:
"""
Compute the proper Arps form of N
"""
dt = DeclineCurve._validate_ndarray(t - t0)
if q < MIN_EPSILON:
return cast(NDFloat, np.atleast_1d(N) + np.zeros_like(t, dtype=np.float64))
if D < MIN_EPSILON:
return np.atleast_1d(N + q * dt)
if abs(1.0 - b) < MIN_EPSILON:
return N + q / D * np.log1p(D * dt)
# Handle overflow for this function
# N + q / ((1.0 - b) * D) * (1.0 - (1.0 + b * D * dt) ** (1.0 - 1.0 / b))
if b <= MultisegmentHyperbolic.B_EPSILON:
D_dt = -D * dt
q_b_D = q / D
else:
D_dt = (1.0 - 1.0 / b) * np.log(1.0 + b * D * dt)
q_b_D = q / ((1.0 - b) * D)
np.putmask(D_dt, mask=D_dt > LOG_EPSILON, values=np.inf) # type: ignore
np.putmask(D_dt, mask=D_dt < -LOG_EPSILON, values=-np.inf) # type: ignore
with np.errstate(over='ignore', under='ignore', invalid='ignore'):
return N - q_b_D * np.expm1(D_dt)
@staticmethod
def _Dcheck(t0: float, q: float, D: float, b: float, N: float,
t: Union[float, NDFloat]) -> NDFloat:
"""
Compute the proper Arps form of D
"""
dt = DeclineCurve._validate_ndarray(t - t0)
if D < MIN_EPSILON:
return np.full_like(t, D, dtype=np.float64)
if b < MIN_EPSILON:
b = 0.0
with np.errstate(over='ignore', under='ignore', invalid='ignore'):
return D / (1.0 + D * b * dt)
@staticmethod
def _Dcheck2(t0: float, q: float, D: float, b: float, N: float,
t: Union[float, NDFloat]) -> NDFloat:
"""
Compute the derivative of the proper Arps form of D
"""
dt = DeclineCurve._validate_ndarray(t - t0)
if D < MIN_EPSILON:
return np.full_like(t, D, dtype=np.float64)
Denom = 1.0 + D * b * dt
return -b * D * D / (Denom * Denom)
def _vectorize(self, fn: Callable[..., NDFloat],
t: Union[float, NDFloat]) -> NDFloat:
"""
Vectorize the computation of a parameter
"""
t = np.atleast_1d(t)
p = self.segment_params
x = np.zeros_like(t, dtype=np.float64)
for i in range(p.shape[0]):
where_seg = t >= p[i, self.T_IDX]
if i < p.shape[0] - 1:
where_seg = where_seg & (t < p[i + 1, self.T_IDX])
x[where_seg] = fn(*p[i], t[where_seg])
return x
def _qfn(self, t: NDFloat) -> NDFloat:
return self._vectorize(self._qcheck, t)
def _Nfn(self, t: NDFloat, **kwargs: Any) -> NDFloat:
return self._vectorize(self._Ncheck, t)
def _Dfn(self, t: NDFloat) -> NDFloat:
return self._vectorize(self._Dcheck, t)
def _Dfn2(self, t: NDFloat) -> NDFloat:
return self._vectorize(self._Dcheck2, t)
def _betafn(self, t: NDFloat) -> NDFloat:
return self._vectorize(self._Dcheck, t) * t
def _bfn(self, t: NDFloat) -> NDFloat:
return self._vectorize(lambda *p: p[self.B_IDX], t)
@classmethod
def nominal_from_secant(cls, D: float, b: float) -> float:
if b <= MultisegmentHyperbolic.B_EPSILON:
return cls.nominal_from_tangent(D)
if D < MIN_EPSILON:
return 0.0 # pragma: no cover
if D >= 1.0:
return np.inf # pragma: no cover
# D < 1 per validation, so this should never overflow
return ((1.0 - D) ** -b - 1.0) / b
@classmethod
def secant_from_nominal(cls, D: float, b: float) -> float:
if b <= MultisegmentHyperbolic.B_EPSILON:
return cls.tangent_from_nominal(D)
# Handle overflow for this function
# Deff = 1.0 - 1.0 / (1.0 + D * b) ** (1.0 / b)
if D < MIN_EPSILON:
return 0.0 # pragma: no cover
D_b = 1.0 + D * b
if D_b < MIN_EPSILON:
return -np.inf # pragma: no cover
D_dt = 1.0 / b * np.log(D_b)
if D_dt > LOG_EPSILON:
# >= 100% decline is not possible
return 1.0 # pragma: no cover
return -expm1(-D_dt)
@classmethod
def nominal_from_tangent(cls, D: float) -> float:
if D < MIN_EPSILON:
return 0.0 # pragma: no cover
if D >= 1.0:
return np.inf # pragma: no cover
return -log1p(-D)
@classmethod
def tangent_from_nominal(cls, D: float) -> float:
if D < MIN_EPSILON:
return 0.0 # pragma: no cover
if D > LOG_EPSILON:
# >= 100% decline is not possible
return 1.0 # pragma: no cover
return -expm1(-D)
[docs]@dataclass(frozen=True)
class MH(MultisegmentHyperbolic):
"""
Modified Hyperbolic Model
Robertson, S. 1988. Generalized Hyperbolic Equation.
Available from SPE, Richardson, Texas, USA. SPE-18731-MS.
Parameters
----------
qi: float
The initial production rate in units of ``volume / day``.
Di: float
The initial decline rate in secant effective decline aka annual
effective percent decline, i.e.
.. math::
D_i = 1 - \\frac{q(t=1 \\, year)}{qi}
.. math::
D_i = 1 - (1 + 365.25 \\, D_{nom} \\, b) ^ \\frac{-1}{b}
where ``Dnom`` is defined as :math:`\\frac{d}{dt}\\textrm{ln} \\, q`
and has units of ``1 / day``.
bi: float
The (initial) hyperbolic parameter, defined as :math:`\\frac{d}{dt}\\frac{1}{D}`.
This parameter is dimensionless.
Dterm: float
The terminal secant effective decline rate aka annual effective percent decline.
"""
qi: float
Di: float
bi: float
Dterm: float = 0.0
validate_params: Iterable[bool] = field(default_factory=lambda: [True] * 4)
def _validate(self) -> None:
if self.nominal_from_secant(self.Di, self.bi) < self.nominal_from_tangent(self.Dterm):
raise ValueError('Di < Dterm')
super()._validate()
def _segments(self) -> NDFloat:
"""
Precache the initial conditions of each hyperbolic segment.
"""
Di_nom = self.nominal_from_secant(self.Di, self.bi) / DAYS_PER_YEAR
Dterm_nom = self.nominal_from_tangent(self.Dterm) / DAYS_PER_YEAR
if Di_nom < MIN_EPSILON or Dterm_nom < MIN_EPSILON or self.bi < MIN_EPSILON:
return np.array([
[0.0, self.qi, Di_nom, self.bi, 0.0]
], dtype=np.float64)
tterm = ((1.0 / Dterm_nom) - (1.0 / Di_nom)) / self.bi
qterm = self._qcheck(0.0, self.qi, Di_nom, self.bi, 0.0, np.array(tterm)).item()
Nterm = self._Ncheck(0.0, self.qi, Di_nom, self.bi, 0.0, np.array(tterm)).item()
return np.array([
[0.0, self.qi, Di_nom, self.bi, 0.0],
[tterm, qterm, Dterm_nom, 0.0, Nterm]
], dtype=np.float64)
[docs] @classmethod
def get_param_descs(cls) -> List[ParamDesc]:
return [
ParamDesc(
'qi', 'Initial rate [vol/day]',
0.0, None,
lambda r, n: r.uniform(1e-10, 1e6, n)),
ParamDesc( # TODO
'Di', 'Initial decline [sec. eff. / yr]',
0.0, 1.0,
lambda r, n: r.uniform(0.0, 1.0, n),
exclude_upper_bound=True),
ParamDesc(
'bi', 'Hyperbolic exponent',
0.0, 2.0,
lambda r, n: r.uniform(0.0, 2.0, n)),
ParamDesc( # TODO
'Dterm', 'Terminal decline [tan. eff. / yr]',
0.0, 1.0,
lambda r, n: np.zeros(n, dtype=np.float64),
exclude_upper_bound=True)
]
[docs]@dataclass(frozen=True)
class THM(MultisegmentHyperbolic):
"""
Transient Hyperbolic Model
Fulford, D. S., and Blasingame, T. A. 2013. Evaluation of Time-Rate
Performance of Shale Wells using the Transient Hyperbolic Relation.
Presented at SPE Unconventional Resources Conference – Canada in Calgary,
Alberta, Canda, 5–7 November. SPE-167242-MS.
https://doi.org/10.2118/167242-MS.
Analytic Approximation
Fulford, D.S. 2018. A Model-Based Diagnostic Workflow for Time-Rate
Performance of Unconventional Wells. Presented at Unconventional Resources
Conference in Houston, Texas, USA, 23–25 July. URTeC-2903036.
https://doi.org/10.15530/urtec-2018-2903036.
Parameters
----------
qi: float
The initial production rate in units of ``volume / day``.
Di: float
The initial decline rate in secant effective decline aka annual
effective percent decline, i.e.
.. math::
D_i = 1 - \\frac{q(t=1 \\, year)}{qi}
.. math::
D_i = 1 - (1 + 365.25 \\, D_{nom} \\, b) ^ \\frac{-1}{b}
where ``Dnom`` is defined as :math:`\\frac{d}{dt}\\textrm{ln} \\, q`
and has units of ``1 / day``.
bi: float
The initial hyperbolic parameter, defined as :math:`\\frac{d}{dt}\\frac{1}{D}`.
This parameter is dimensionless. Advised to always be set to ``2.0`` to represent
transient linear flow.
See literature for more details.
bf: float
The final hyperbolic parameter after transition. Represents the boundary-dominated or
boundary-influenced flow regime.
telf: float
The time to end of linear flow in units of ``day``, or more specifically the time at
which ``b(t) < bi``. Visual end of half slope occurs ``~2.5x`` after ``telf``.
bterm: Optional[float] = None
The terminal value of the hyperbolic parameter. Has two interpretations:
If ``tterm > 0`` then the terminal regime is a hyperbolic regime with ``b = bterm``
and the parameter is given as the hyperbolic parameter.
If ``tterm = 0`` then the terminal regime is an exponential regime with
``Dterm = bterm`` and the parameter is given as secant effective decline.
tterm: Optional[float] = None
The time to start of the terminal regime. Setting ``tterm = 0.0`` creates an exponential
terminal regime, while setting ``tterm > 0.0`` creates a hyperbolic terminal regime.
"""
qi: float
Di: float
bi: float
bf: float
telf: float
bterm: float = 0.0
tterm: float = 0.0
validate_params: Iterable[bool] = field(default_factory=lambda: [True] * 7)
EXP_GAMMA: ClassVar[float] = exp(0.5572156)
EXP_1: ClassVar[float] = exp(1.0)
def _validate(self) -> None:
# TODO: do we want to deal with optional params at all?
if self.bi < self.bf:
raise ValueError('bi < bf')
if self.bf < self.bterm and self.tterm != 0.0:
raise ValueError('bf < bterm and tterm != 0')
# cheat to fix this
# object.__setattr__(self, 'bterm', self.bf)
pass
if self.tterm != 0.0 and self.tterm * DAYS_PER_YEAR < self.telf:
raise ValueError('tterm < telf')
super()._validate()
def _segments(self) -> NDFloat:
t1 = 0.0
t2 = self.telf * (self.EXP_1 - 1.0)
t3 = self.telf * (self.EXP_1 + 1.0)
tterm = self.tterm * DAYS_PER_YEAR
b1 = self.bi
b2 = self.bi - ((self.bi - self.bf) / self.EXP_1)
b3 = self.bf
bterm = self.bterm
q1 = self.qi
D1 = self.nominal_from_secant(self.Di, self.bi) / DAYS_PER_YEAR
N1 = 0.0
if tterm == 0.0 and bterm == 0.0:
# no terminal segment
segments = np.array(
[
[t1, q1, D1, b1, N1],
[t2, None, None, b2, None],
[t3, None, None, b3, None]
],
dtype=np.float64
)
elif tterm != 0.0:
# hyperbolic terminal segment
t4 = tterm if tterm >= t3 else self.telf * 7.0
b4 = min(bterm, b3)
segments = np.array(
[
[t1, q1, D1, b1, N1],
[t2, None, None, b2, None],
[t3, None, None, b3, None],
[t4, None, None, b4, None],
],
dtype=np.float64
)
elif tterm == 0.0 and bterm != 0.0:
# exponential terminal segment
D2 = self._Dcheck(t1, q1, D1, b1, 0.0, t2).item()
q2 = self._qcheck(t1, q1, D1, b1, 0.0, t2).item()
D3 = self._Dcheck(t2, q2, D2, b2, 0.0, t3).item()
D4 = self.nominal_from_tangent(bterm) / DAYS_PER_YEAR
b4 = 0.0
if b3 <= 0:
t4 = t3
else:
t4 = max(t3, t3 + (1.0 / D4 - 1.0 / D3) / b3)
if t4 == t3:
segments = np.array(
[
[t1, q1, D1, b1, N1],
[t2, None, None, b2, None],
[t4, None, None, b4, None],
],
dtype=np.float64
)
else:
segments = np.array(
[
[t1, q1, D1, b1, N1],
[t2, None, None, b2, None],
[t3, None, None, b3, None],
[t4, None, None, b4, None],
],
dtype=np.float64
)
# Compute initial values for each segment after the first, from the
# previous segment's values
for i in range(segments.shape[0] - 1):
p = [*segments[i], segments[i + 1, self.T_IDX]]
segments[i + 1, self.D_IDX] = self._Dcheck(*p).item()
segments[i + 1, self.Q_IDX] = self._qcheck(*p).item()
segments[i + 1, self.N_IDX] = self._Ncheck(*p).item()
return segments
[docs] def transient_rate(self, t: Union[float, NDFloat], **kwargs: Any) -> NDFloat:
"""
Compute the rate function using full definition.
Uses :func:`scipy.integrate.fixed_quad` to integrate :func:`transient_D`.
.. math::
q(t) = e^{-\\int_0^t D(t) \\, dt}
Parameters
----------
t: Union[float, numpy.NDFloat]
An array of time values to evaluate.
**kwargs
Additional keyword arguments passed to :func:`scipy.integrate.fixed_quad`.
Returns
-------
numpy.NDFloat
"""
t = self._validate_ndarray(t)
return self._transqfn(t, **kwargs)
[docs] def transient_cum(self, t: Union[float, NDFloat], **kwargs: Any) -> NDFloat:
"""
Compute the cumulative volume function using full definition.
Uses :func:`scipy.integrate.fixed_quad` to integrate :func:`transient_q`.
.. math::
N(t) = \\int_0^t q(t) \\, dt
Parameters
----------
t: Union[float, numpy.NDFloat]
An array of time values to evaluate.
**kwargs
Additional keyword arguments passed to :func:`scipy.integrate.fixed_quad`.
Returns
-------
numpy.NDFloat
"""
t = self._validate_ndarray(t)
return self._transNfn(t, **kwargs)
[docs] def transient_D(self, t: Union[float, NDFloat]) -> NDFloat:
"""
Compute the D-parameter function using full definition.
.. math::
D(t) = \\frac{1}{\\frac{1}{Di} + b_i t + \\frac{bi - bf}{c}
(\\textrm{Ei}[-e^{-c \\, (t -t_{elf}) + e^(\\gamma)}]
- \\textrm{Ei}[-e^{c \\, t_{elf} + e^(\\gamma)}])}
Parameters
----------
t: Union[float, numpy.NDFloat]
An array of time values to evaluate.
Returns
-------
numpy.NDFloat
"""
t = self._validate_ndarray(t)
return self._transDfn(t)
[docs] def transient_beta(self, t: Union[float, NDFloat]) -> NDFloat:
"""
Compute the beta-parameter function using full definition.
.. math::
\\beta(t) = \\frac{t}{\\frac{1}{Di} + b_i t + \\frac{bi - bf}{c}
(\\textrm{Ei}[-e^{-c \\, (t -t_{elf}) + e^(\\gamma)}]
- \\textrm{Ei}[-e^{c \\, t_{elf} + e^(\\gamma)}])}
Parameters
----------
t: Union[float, numpy.NDFloat]
An array of time values to evaluate.
Returns
-------
numpy.NDFloat
"""
t = self._validate_ndarray(t)
return self._transDfn(t) * t
[docs] def transient_b(self, t: Union[float, NDFloat]) -> NDFloat:
"""
Compute the b-parameter function using full definition.
.. math::
b(t) = b_i - (b_i - b_f) e^{-\\textrm{exp}[{-c * (t - t_{elf}) + e^{\\gamma}}]}
where:
.. math::
c & = \\frac{e^{\\gamma}}{1.5 \\, t_{elf}} \\\\
\\gamma & = 0.57721566... \\; \\textrm{(Euler-Mascheroni constant)}
Parameters
----------
t: Union[float, numpy.NDFloat]
An array of time values to evaluate.
Returns
-------
numpy.NDFloat
"""
t = self._validate_ndarray(t)
return self._transbfn(t)
def _transNfn(self, t: NDFloat, **kwargs: Any) -> NDFloat:
kwargs.setdefault('n', 10)
return self._integrate_with(lambda t: self._transqfn(t, **kwargs), t, **kwargs)
def _transqfn(self, t: NDFloat, **kwargs: Any) -> NDFloat:
kwargs.setdefault('n', 10)
qi = self.qi
Dnom_i = self.nominal_from_secant(self.Di, self.bi) / DAYS_PER_YEAR
D_dt = Dnom_i - self._integrate_with(self._transDfn, t, **kwargs)
where_eps = abs(D_dt) > LOG_EPSILON
result = np.zeros_like(t, dtype=np.float64)
result[where_eps] = 0.0
result[~where_eps] = qi * np.exp(D_dt)
return result
def _transDfn(self, t: NDFloat) -> NDFloat:
try:
import mpmath as mp # type: ignore
except ImportError:
print('`mpmath` not installed, please install it compute the transient THM functions',
file=sys.stderr)
return np.full_like(t, np.nan, dtype=np.float64)
t = np.atleast_1d(t)
qi = self.qi
bi = self.bi
bf = self.bf
telf = self.telf
bterm = self.bterm
tterm = self.tterm * DAYS_PER_YEAR
if self.Di == 0.0:
return np.full_like(t, 0.0, dtype=np.float64)
Dnom_i = self.nominal_from_secant(self.Di, self.bi) / DAYS_PER_YEAR
if Dnom_i < MIN_EPSILON:
# no need to compute transient function
return self._Dcheck(0.0, qi, Dnom_i, bi, 0.0, t)
elif Dnom_i < MIN_EPSILON:
raise ValueError(f'invalid Dnom in _transDfn {Dnom_i}') # pragma: no cover
if telf < MIN_EPSILON:
# telf is too small to compute transient function
D = self._Dcheck(0.0, qi, Dnom_i, bf, 0.0, t)
Dterm = self._Dcheck(0.0, qi, Dnom_i, bf, 0.0, tterm).item()
else:
# transient function
if tterm > 0.0:
where_term = t >= tterm
else:
# no known terminal times in this array, might be some later if exponential terminal
where_term = np.full_like(t, False, dtype=np.bool_)
c = self.EXP_GAMMA / (1.5 * telf)
D_denom = np.full_like(t, np.nan, dtype=np.float64)
D_denom[~where_term] = (
1.0 / Dnom_i
+ bi * t[~where_term]
- ei(-np.exp(c * telf + self.EXP_GAMMA))
)
if abs(bi - bf) >= MIN_EPSILON:
for i, _t in enumerate(t):
if where_term[i]:
break
D_denom[i] += (bi - bf) / c * mp.ei(-mp.exp(-c * (_t - telf) + self.EXP_GAMMA))
D = 1.0 / D_denom
if tterm > 0.0:
D_denom = (
1.0 / Dnom_i
+ bi * tterm
- ei(-np.exp(c * telf + self.EXP_GAMMA))
)
if abs(bi - bf) >= MIN_EPSILON:
D_denom += (bi - bf) / c * mp.ei(-mp.exp(-c * (tterm - telf) + self.EXP_GAMMA))
Dterm = float(1.0 / D_denom)
else:
Dterm = 0.0
# terminal regime
if tterm != 0.0 or bterm != 0.0:
if tterm > 0.0:
# hyperbolic
where_term = t > tterm
D[where_term] = self._Dcheck(tterm, 1.0, Dterm, bterm, 0.0, t[where_term])
elif tterm == 0.0:
# exponential
Dterm = self.nominal_from_tangent(bterm) / DAYS_PER_YEAR
where_term = Dterm >= D
D[where_term] = self._Dcheck(tterm, 1.0, Dterm, 0.0, 0.0, t[where_term])
return D
def _transbfn(self, t: NDFloat) -> NDFloat:
t = np.atleast_1d(t)
bi = self.bi
bf = self.bf
telf = self.telf
bterm = self.bterm
tterm = self.tterm * DAYS_PER_YEAR
if telf >= MIN_EPSILON:
c = self.EXP_GAMMA / (1.5 * telf)
b = bi - (bi - bf) * np.exp(-np.exp(-c * (t - telf) + self.EXP_GAMMA))
else:
b = np.full_like(t, bf, dtype=np.float64)
# terminal regime
if tterm != 0.0 or bterm != 0:
if tterm > 0.0:
# hyperbolic
where_term = t > tterm
b[where_term] = bterm
elif tterm == 0.0:
# exponential
Dterm = self.nominal_from_tangent(bterm) / DAYS_PER_YEAR
D = self._transDfn(t)
where_term = Dterm >= D
b[where_term] = 0.0
return b
@classmethod
def get_param_descs(cls) -> List[ParamDesc]:
return [
ParamDesc(
'qi', 'Initial rate [vol/day]',
0.0, None,
lambda r, n: r.uniform(1.0, 2e4, n)),
ParamDesc( # TODO
'Di', 'Initial decline [sec. eff. / yr]',
0.0, 1.0,
lambda r, n: r.uniform(0.0, 1.0, n),
exclude_upper_bound=True),
ParamDesc(
'bi', 'Initial hyperbolic exponent',
0.0, 2.0,
lambda r, n: np.full(n, 2.0)),
ParamDesc( # TODO
'bf', 'Final hyperbolic exponent',
0.0, 2.0,
lambda r, n: r.uniform(0.0, 1.0, n)),
ParamDesc( # TODO
'telf', 'Time to end of linear flow [days]',
None, None,
lambda r, n: r.uniform(1e-10, 365.25, n)),
ParamDesc(
'bterm', 'Terminal hyperbolic exponent',
0.0, 2.0,
lambda r, n: np.full(n, 0.0)),
ParamDesc(
'tterm', 'Terminal time [years]',
0.0, None,
lambda r, n: np.full(n, 0.0))
]
[docs]@dataclass(frozen=True)
class PLE(PrimaryPhase):
"""
Power-Law Exponential Model
Ilk, D., Perego, A. D., Rushing, J. A., and Blasingame, T. A. 2008.
Exponential vs. Hyperbolic Decline in Tight Gas Sands – Understanding
the Origin and Implications for Reserve Estimates Using Arps Decline Curves.
Presented at SPE Annual Technical Conference and Exhibition in Denver,
Colorado, USA, 21–24 September. SPE-116731-MS. https://doi.org/10.2118/116731-MS.
Ilk, D., Rushing, J. A., and Blasingame, T. A. 2009.
Decline Curve Analysis for HP/HT Gas Wells: Theory and Applications.
Presented at SPE Annual Technical Conference and Exhibition in New Orleands,
Louisiana, USA, 4–7 October. SPE-125031-MS. https://doi.org/10.2118/125031-MS.
Parameters
----------
qi: float
The initial production rate in units of ``volume / day``.
Di: float
The initial decline rate in nominal decline rate defined as ``d[ln q] / dt``
and has units of ``1 / day``.
Dterm: float
The terminal decline rate in nominal decline rate, has units of ``1 / day``.
n: float
The n exponent.
"""
qi: float
Di: float
Dinf: float
n: float
validate_params: Iterable[bool] = field(default_factory=lambda: [True] * 4)
def _validate(self) -> None:
if self.Dinf > self.Di:
raise ValueError('Dinf > Di')
def _qfn(self, t: NDFloat) -> NDFloat:
qi = self.qi
Di = self.Di
Dinf = self.Dinf
n = self.n
return qi * np.exp(-Di * t ** n - Dinf * t)
def _Nfn(self, t: NDFloat, **kwargs: Any) -> NDFloat:
return self._integrate_with(self._qfn, t, **kwargs)
def _Dfn(self, t: NDFloat) -> NDFloat:
Di = self.Di
Dinf = self.Dinf
n = self.n
return Dinf + Di * n * t ** (n - 1.0)
def _Dfn2(self, t: NDFloat) -> NDFloat:
Di = self.Di
Dinf = self.Dinf
n = self.n
return Dinf + Di * n * (n - 1.0) * t ** (n - 2.0)
def _betafn(self, t: NDFloat) -> NDFloat:
Di = self.Di
Dinf = self.Dinf
n = self.n
return Dinf * t + Di * n * t ** n
def _bfn(self, t: NDFloat) -> NDFloat:
Di = self.Di
Dinf = self.Dinf
n = self.n
Denom = (Dinf * t + Di * n * t ** n)
return Di * (1.0 - n) * n * t ** n / (Denom * Denom)
[docs] @classmethod
def get_param_descs(cls) -> List[ParamDesc]:
return [
ParamDesc(
'qi', 'Initial rate [vol/day]',
0, None,
lambda r, n: r.uniform(1e-10, 1e6, n)),
ParamDesc(
'Di', 'Initial decline rate [/day]',
0.0, None,
lambda r, n: r.uniform(0.0, 1e3, n)),
ParamDesc(
'Dinf', 'Terminal decline rate [/day]',
0, None,
lambda r, n: r.uniform(0.0, 1e3, n)),
ParamDesc(
'n', 'PLE exponent',
0.0, 1.0,
lambda r, n: r.uniform(1e-6, 1.0, n),
exclude_lower_bound=True,
exclude_upper_bound=True),
]
[docs]@dataclass(frozen=True)
class SE(PrimaryPhase):
"""
Stretched Exponential
Valkó, P. P. Assigning Value to Stimulation in the Barnett Shale:
A Simultaneous Analysis of 7000 Plus Production Histories and Well
Completion Records. 2009. Presented at SPE Hydraulic Fracturing
Technology Conference in College Station, Texas, USA, 19–21 January.
SPE-119369-MS. https://doi.org/10.2118/119369-MS.
Parameters
----------
qi: float
The initial production rate in units of ``volume / day``.
tau: float
The tau parameter in units of ``day ** n``. Equivalent to:
.. math::
\\tau = D^n
n: float
The ``n`` exponent.
"""
qi: float
tau: float
n: float
validate_params: Iterable[bool] = field(default_factory=lambda: [True] * 3)
def _qfn(self, t: NDFloat) -> NDFloat:
qi = self.qi
tau = self.tau
n = self.n
return qi * np.exp(-(t / tau) ** n)
def _Nfn(self, t: NDFloat, **kwargs: Any) -> NDFloat:
qi = self.qi
tau = self.tau
n = self.n
return qi * tau / n * gammainc(1.0 / n, (t / tau) ** n)
def _Dfn(self, t: NDFloat) -> NDFloat:
tau = self.tau
n = self.n
return n * tau ** -n * t ** (n - 1.0)
def _Dfn2(self, t: NDFloat) -> NDFloat:
tau = self.tau
n = self.n
return n * (n - 1.0) * tau ** -n * t ** (n - 2.0)
def _betafn(self, t: NDFloat) -> NDFloat:
tau = self.tau
n = self.n
return n * tau ** -n * t ** n
def _bfn(self, t: NDFloat) -> NDFloat:
tau = self.tau
n = self.n
return (1.0 - n) / n * tau ** n * t ** -n
[docs] @classmethod
def get_param_descs(cls) -> List[ParamDesc]:
return [
ParamDesc(
'qi', 'Initial rate [vol/day]',
0.0, None,
lambda r, n: r.uniform(1e-10, 1e6, n)),
ParamDesc(
'tau', 'tau',
1e-10, 1e4,
lambda r, n: r.uniform(1e-10, 1e4, n)),
ParamDesc(
'n', 'SE exponent',
1e-10, 1.0,
lambda r, n: r.uniform(1e-10, 1.0, n),
exclude_upper_bound=True),
]
[docs]@dataclass(frozen=True)
class Duong(PrimaryPhase):
"""
Duong Model
Duong, A. N. 2001. Rate-Decline Analysis for Fracture-Dominated
Shale Reservoirs. SPE Res Eval & Eng 14 (3): 377–387. SPE-137748-PA.
https://doi.org/10.2118/137748-PA.
Parameters
----------
qi: float
The initial production rate in units of ``volume / day`` *defined at ``t=1 day``*.
a: float
The ``a`` parameter. Roughly speaking, controls slope of the :func:``q(t)`` function.
m: float
The ``m`` parameter. Roughly speaking, controls curvature of the:func:``q(t)``
function.
"""
qi: float
a: float
m: float
validate_params: Iterable[bool] = field(default_factory=lambda: [True] * 3)
def _qfn(self, t: NDFloat) -> NDFloat:
qi = self.qi
a = self.a
m = self.m
return np.where(t == 0.0, 0.0,
qi * t ** -m * np.exp(a / (1.0 - m) * (t ** (1.0 - m) - 1.0)))
def _Nfn(self, t: NDFloat, **kwargs: Any) -> NDFloat:
qi = self.qi
a = self.a
m = self.m
return np.where(t == 0.0, 0.0, qi / a * np.exp(a / (1.0 - m) * (t ** (1.0 - m) - 1.0)))
def _Dfn(self, t: NDFloat) -> NDFloat:
a = self.a
m = self.m
# alternative form: D = m * t ** -1.0 - a * t ** -m
return m / t - a * t ** -m
def _Dfn2(self, t: NDFloat) -> NDFloat:
a = self.a
m = self.m
# alternative form: D = m * t ** -1.0 - a * t ** -m
return -m / (t * t) + m * a * t ** (-m - 1.0)
def _betafn(self, t: NDFloat) -> NDFloat:
a = self.a
m = self.m
return m - a * t ** (1.0 - m)
def _bfn(self, t: NDFloat) -> NDFloat:
a = self.a
m = self.m
Denom = a * t - m * t ** m
return np.where(
Denom == 0.0, 0.0, m * t ** m * (t ** m - a * t) / (Denom * Denom))
[docs] @classmethod
def get_param_descs(cls) -> List[ParamDesc]:
return [
ParamDesc(
'qi', 'Initial rate [vol/day]',
0.0, None,
lambda r, n: r.uniform(1.0, 2e4, n)),
ParamDesc(
'a', 'a',
1.0, None,
lambda r, n: r.uniform(1.0, 10.0, n)),
ParamDesc(
'm', 'm',
1.0, None,
lambda r, n: r.uniform(1.0, 10.0, n),
exclude_lower_bound=True)
]