1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# -*- mode: python -*-
#
# Aaron LI
# Created: 2016-07-10
# Updated: 2016-07-14
#
# Change logs:
# 2016-07-14:
# * Workaround the spline fitting error when give constant input y data
# 2016-07-13:
# * Improve the `np.array` usage a bit
#
"""
This module implements the spline functions used to interpolate
and/or extrapolate a group of discrete data.
* smoothing spline: R's mgcv::gam()
"""
import sys
import numpy as np
import rpy2.robjects as ro
from rpy2.robjects.packages import importr
from rpy2.rinterface import RRuntimeError
class Spline:
"""
Meta class to fit a spline to the input data.
"""
# input data
x = None
y = None
# weights of each data point when fitting the spline
weights = None
# specifies which axis should be transformed to logarithmic scale
log10 = None
# fitted spline for interpolation and extrapolation
spline = None
def __init__(self, x, y, weights=None):
self.x = np.array(x)
self.y = np.array(y)
if weights is None:
self.weights = np.ones(self.x.shape)
else:
self.weights = np.array(weights)
self.log10 = []
def fit(self, log10=[]):
"""
The parameter `log10` specifies the axis to be transformed
to the logarithmic scale before fitted by the spline, e.g.,
`log10=["x"]`, `log10=["x", "y"]`.
"""
raise NotImplementedError
def eval(self, x):
"""
Evaluate the specified spline at the supplied positions.
Also check whether the spline was fitted in the log-scale space,
and transform the evaluated values if necessary.
"""
raise NotImplementedError
class SmoothSpline(Spline):
"""
Fit a smoothing spline to the data.
Currently, the penalized smoothing spline from R's `mgcv::gam()`
is employed. In addition, the fitted spline allows extrapolation.
"""
mgcv = importr("mgcv")
def fit(self, log10=[]):
self.log10 = list(map(str.upper, log10))
if "X" in self.log10:
x = ro.FloatVector(np.log10(self.x))
else:
x = ro.FloatVector(self.x)
if "Y" in self.log10:
y = ro.FloatVector(np.log10(self.y))
else:
y = ro.FloatVector(self.y)
weights = ro.FloatVector(self.weights)
try:
self.spline = self.mgcv.gam(
ro.Formula("y ~ s(x, bs='ps')"),
data=ro.DataFrame({"x": x, "y": y}),
weights=weights, method="REML")
except RRuntimeError:
# NOTE:
# If the input y data is constant (e.g., the temperature profile
# has only ONE data point), then above smoothing spline fitting
# using method "REML" will failed with error:
# Error in gam.reparam(UrS, sp, grderiv) :
# NA/NaN/Inf in foreign function call (arg 3)
# or with error:
# Error in eigen(hess1, symmetric = TRUE) :
# infinite or missing values in 'x'
print("WARNING: 'mgcv.gam()' using method 'REML' failed!",
file=sys.stderr)
self.spline = self.mgcv.gam(
ro.Formula("y ~ s(x, bs='ps', sp=0.6)"),
data=ro.DataFrame({"x": x, "y": y}),
weights=weights)
def eval(self, x):
x = np.array(x, ndmin=1)
if "X" in self.log10:
x_new = ro.ListVector({"x": ro.FloatVector(np.log10(x))})
else:
x_new = ro.ListVector({"x": ro.FloatVector(x)})
y_pred = self.mgcv.predict_gam(self.spline, newdata=x_new)
if "Y" in self.log10:
y_pred = 10 ** np.array(y_pred)
else:
y_pred = np.array(y_pred)
#
if len(y_pred) == 1:
return y_pred[0]
else:
return y_pred
|