'''Functions to calculate the precipitation type.
For a point or numpy arrays
'''
import numpy as np
import gdal
import osr
from pypros.psychrometrics import ttd2tw
from pypros.psychrometrics import get_tw_sadeghi
from pypros.ros_methods import calculate_koistinen_saltikoff
from pypros.ros_methods import calculate_single_threshold
from pypros.ros_methods import calculate_linear_transition
from pypros.ros_methods import calculate_dual_threshold
[docs]class PyPros:
"""
Main project class. Discriminates precipitation type considering
different methodologies using surface observations.
"""
[docs] def __init__(self, variables_file, method='ks', threshold=None,
data_format=None):
"""
Args:
variables_file (str, list): The file paths containing air
temperature, dew point
temperature and (digital elevation
model) fields.
method (str): The precipitation type discrimination
method to use. Defaults to ks.
Available:
- ks : Koistinen and Saltikoff method
- single_tw: A single wet bulb temperature
threshold
- single_ta: A single air temperature threshold
- dual_tw : A dual wet bulb temperature thresholds
- dual_ta : A dual air temperature threshold
- linear_tr: Linear transition between rain
and snow
threshold (float, list): Threshold value(s) to use in the
different methods available.
Defaults to:
- static_tw: 1.5
- static_ta: 0.0
- linear_tr: [0, 3]
data_format (dict, optional): Defaults to None. The order of the
variables in the variables files.
Defaults to:
{'vars_files': ['tair',
'tdew',
'dem']}
Raises:
ValueError: Raised when the method is not valid
"""
if data_format is None:
self.data_format = {'vars_files': ['tair', 'tdew', 'dem']}
else:
self.data_format = data_format
if threshold is None:
if method == 'static_ta':
self.threshold = 0
elif method == 'static_tw':
self.threshold = 1.5
elif method == 'linear_tr' or method == 'dual_ta':
self.threshold = [0, 3]
elif method == 'dual_tw':
self.threshold = [0.7, 1.0]
elif method == 'ks':
None
else:
raise ValueError('Non valid method. Valid values are ks and ' +
'static_tw, static_ta and linear_tr')
else:
if method == 'single_ta' or method == 'single_tw':
if not type(threshold) == float:
raise ValueError('The threshold for the method {} must ' +
'be a float'.format(method))
elif (method == 'linear_tr' or method == 'dual_ta'
or method == 'dual_tw'):
if (not (type(threshold) == list or type(threshold) == tuple)
or len(threshold) != 2):
raise ValueError('The thresholds for the method {} must ' +
'be a list/tuple of length ' +
'two'.format(method))
elif method == 'ks':
None
else:
raise ValueError('Non valid method. Valid values are ks and ' +
'single_tw, single_ta and linear_tr')
self.threshold = threshold
self.__read_variables_files__(variables_file)
self.method = method
tair = self.variables[self.data_format['vars_files'].index('tair')]
tdew = self.variables[self.data_format['vars_files'].index('tdew')]
if method == 'ks':
self.result = calculate_koistinen_saltikoff(tair, tdew)
elif method == 'single_tw' or method == 'dual_tw':
try:
dem = self.variables[
self.data_format['vars_files'].index('dem')]
except ValueError:
print('Since no DEM is supplied, wet bulb temperature ' +
'calculations will assume a constant pressure of ' +
'1013.25 hPa.')
twet = ttd2tw(tair, tdew)
else:
twet = get_tw_sadeghi(tair, tdew, dem)
if method == 'single_tw':
self.result = calculate_single_threshold(twet, self.threshold)
elif method == 'dual_tw':
self.result = calculate_dual_threshold(twet, self.threshold[0],
self.threshold[1])
elif method == 'single_ta':
self.result = calculate_single_threshold(tair, self.threshold)
elif method == 'linear_tr':
self.result = calculate_linear_transition(tair, self.threshold[0],
self.threshold[1])
elif method == 'dual_ta':
self.result = calculate_dual_threshold(tair, self.threshold[0],
self.threshold[1])
def __read_variables_files__(self, variables_file):
if isinstance(variables_file, (list,)):
self.variables = None
for layer_file in variables_file:
d_s = gdal.Open(layer_file)
if d_s is None:
raise FileNotFoundError("[Errno 2] No such file or " +
"directory: 'BadFile'")
for i in range(d_s.RasterCount):
layer_data = d_s.GetRasterBand(i + 1)\
.ReadAsArray()[np.newaxis, :, :]
if self.variables is None:
self.variables = layer_data
else:
try:
self.variables = np.concatenate((self.variables,
layer_data),
axis=0)
except Exception:
raise ValueError('Variables fields must have the' +
' same shape.')
else:
d_s = gdal.Open(variables_file)
self.variables = d_s.ReadAsArray()
self.out_proj = osr.SpatialReference()
self.out_proj.ImportFromWkt(d_s.GetProjection())
self.size = (d_s.RasterYSize, d_s.RasterXSize)
self.geotransform = d_s.GetGeoTransform()
d_s = None
[docs] def save_file(self, field, file_name):
"""Saves the calculate field data into a file
Args:
file_name (str): The output file path
"""
driver = gdal.GetDriverByName('GTiff')
d_s = driver.Create(file_name, self.size[1], self.size[0], 1,
gdal.GDT_Float32)
d_s.SetGeoTransform(self.geotransform)
d_s.SetProjection(self.out_proj.ExportToWkt())
d_s.GetRasterBand(1).WriteArray(field)
[docs] def refl_mask(self, refl):
"""Calculates the precipitation type masked. The output classification
is as follows:
rain
- 1dBZ : 1
- 5dBZ: 2
- 10dBZ : 3
- 15dBZ: 4
- 25dBZ : 5
sleet
- 1dBZ: 6
- 5dBZ : 7
- 10dBZ: 8
- 15dBZ : 9
- 25dBZ: 10
snow
- 1dBZ : 11
- 5dBZ: 12
- 10dBZ : 13
- 15dBZ: 14
- 25dbZ: 15
Args:
refl (numpy.array): Array with reflectivity values
Raises:
IndexError: Raised if the types don't match in size or type
Returns:
float, numpy array: The precipitation type classification value
"""
if self.result.shape != refl.shape:
raise IndexError('Variables fields must have the' +
' same shape.')
refl_bins = np.array([1, 5, 10, 15, 25])
refl_class = np.digitize(refl, refl_bins)
if self.method == 'ks' or self.method == 'linear_tr':
prob_bins = np.array([0.0, 0.3, 0.7])
ks_class = np.digitize(self.result, prob_bins) - 1
pros = (refl_class + ks_class * 5) * (refl >= 1)
elif self.method == 'single_tw' or self.method == 'single_ta':
rain = np.digitize(self.result, np.array([1]))
pros = (refl_class + rain * 10) * (refl >= 1)
elif self.method == 'dual_tw' or self.method == 'dual_ta':
prob_bins = np.array([0.0, 0.5, 1])
dual_class = np.digitize(self.result, prob_bins) - 1
pros = (refl_class + dual_class * 5) * (refl >= 1)
return pros