Issue
Working with a 2D signal/time series dataset, after finding peaks and troughs, I would like to scale each section of the dataset appropriately.
For example, if I have the following visual dataset, with peaks and troughs labeled as such:
...what's a good "pythonic" way to label every other datapoint between each peak and trough to be a number > -1 and < 1, sort of like so:
I have provided a reproducible code below to experiment with.
NOTE: I'm running Windows 10, Python 3.10.5.
pip install findpeaks
from numpy import array, inf, nan, where
# pip install findpeaks
from findpeaks import findpeaks
from random import gauss, seed
from math import sqrt, exp
# ------------------------------------------------------------------------------------------------ #
# GENERATE RANDOM SIGNAL DATA #
# ------------------------------------------------------------------------------------------------ #
# https://towardsdatascience.com/create-a-stock-price-simulator-with-python-b08a184f197d
def create_GBM(s0, mu, sigma):
"""
Generates a price following a geometric brownian motion process based on the input of the arguments:
- s0: Asset inital price.
- mu: Interest rate expressed annual terms.
- sigma: Volatility expressed annual terms.
"""
st = s0
def generate_value():
nonlocal st
st *= exp((mu - 0.5 * sigma ** 2) * (1. / 365.) + sigma * sqrt(1./365.) * gauss(mu=0, sigma=1))
return st
return generate_value
gbm = create_GBM(100, 0.001, 1.0)
signal = [round(gbm(), 2) for _ in range(10000)]
print(signal)
# ------------------------------------------------------------------------------------------------ #
# FIND PEAKS AND TROUGHS DATAFRAME #
# ------------------------------------------------------------------------------------------------ #
print("Finding peaks/troughs....")
fp = findpeaks(method='peakdetect')
results = fp.fit(array(signal).flatten())
results_df = results['df']
results_df['label'] = where(results_df['valley'], -1,
where(results_df['peak'], 1, nan))
print(results_df)
# ------------------------------------------------------------------------------------------------ #
# FILL NAN's WITH THEIR APPROPRIATE VALUES, SCALED BETWEEN -1 and 1 #
# ------------------------------------------------------------------------------------------------ #
# ????????????????????????????
Given that the results_df
gives the y
values, along with some x
indexes on where they are, I was hoping there'd be a one-liner for this.
Another thought I had would be to iterate through the results df, peak to trough, then trough to peak (repeat) and MinMaxScale
everything between the start and end of each section, as we know what those values are. Something like:
UPDATE
I have a hacky solution here, HOWEVER IT'S NOT WORKING! So treat it as pseudo-code for now, but it looks like this so far. I feel there's an easier way...
# ------------------------------------------------------------------------------------------------ #
# FILL NAN's WITH THEIR APPROPRIATE VALUES, SCALED BETWEEN -1 and 1 #
# ------------------------------------------------------------------------------------------------ #
# Drop nan's from label column to make things easier for iteration
results_df = results_df.dropna()
print(results_df)
# Iterate through the results_df, starting at 1, not 0
for i in range(1, len(results_df)):
# Find the current values for this "section" of the signal dataset
if results_df['label'].iloc[i] > 0:
peak_value = results_df['y'].iloc[i]
peak_value_index = results_df['x'].iloc[i]
trough_value = results_df['y'].iloc[i-1]
trough_value_index = results_df['x'].iloc[i-1]
else:
peak_value = results_df['y'].iloc[i-1]
peak_value_index = results_df['x'].iloc[i-1]
trough_value = results_df['y'].iloc[i]
trough_value_index = results_df['x'].iloc[i]
# Find the current min value
current_min_value = min(peak_value, trough_value)
# Find the difference between the max and min values
current_difference = max(peak_value, trough_value) - min(peak_value, trough_value)
# Now iterate through that "section" of the signal list, and scale accordingly
for j in range(min(peak_value_index, trough_value_index), max(peak_value_index, trough_value_index)+1): # +1 to ensure last datapoint isn't missed
signal[j] = (signal[j] - current_min_value) / current_difference - 1
# Inspect the newly scaled signals at the peak/trough points to ensure they're correct
for i in range(0, len(results_df)):
print(signal[results_df['x'].iloc[i]])
Solution
My code can be found below. There are two remarks:
- My implementation is a variation on your approach with two notable differences. First, I directly iterate through the segments and find these indices outside of the for-loop. Second, your transformation seems to be missing a factor 2. That is, I take transformation = -1 + 2* (value-min)/(max-min) to ensure that transformed value takes the value +1 whenever value=max.
- I also added some code to plot the original series and its transformation together. This allows us to visually check whether the transformation was successful. In general, the transformation seems to be working but it does happen occasionally that the peak detection algorithm misses a peak/trough. The transformation will now receive the wrong input and the result of the transformation is no longer guaranteed to be in the [-1,1] interval.
#!/usr/bin/env python3
from numpy import argwhere, array, inf, isnan, nan, transpose, where, zeros
# pip install findpeaks
from findpeaks import findpeaks
from random import gauss, seed
from math import sqrt, exp
import matplotlib.pyplot as plt
# ------------------------------------------------------------------------------------------------ #
# GENERATE RANDOM SIGNAL DATA #
# ------------------------------------------------------------------------------------------------ #
# https://towardsdatascience.com/create-a-stock-price-simulator-with-python-b08a184f197d
def create_GBM(s0, mu, sigma):
"""
Generates a price following a geometric brownian motion process based on the input of the arguments:
- s0: Asset inital price.
- mu: Interest rate expressed annual terms.
- sigma: Volatility expressed annual terms.
"""
st = s0
def generate_value():
nonlocal st
st *= exp((mu - 0.5 * sigma ** 2) * (1. / 365.) + sigma * sqrt(1./365.) * gauss(mu=0, sigma=1))
return st
return generate_value
gbm = create_GBM(100, 0.001, 1.0)
signal = [round(gbm(), 2) for _ in range(10000)]
print(signal)
# ------------------------------------------------------------------------------------------------ #
# FIND PEAKS AND TROUGHS DATAFRAME #
# ------------------------------------------------------------------------------------------------ #
print("Finding peaks/troughs....")
fp = findpeaks(method='peakdetect')
results = fp.fit(array(signal).flatten())
results_df = results['df']
results_df['label'] = where(results_df['valley'], -1,
where(results_df['peak'], 1, nan))
print(results_df)
# ------------------------------------------------------------------------------------------------ #
# FILL NAN's WITH THEIR APPROPRIATE VALUES, SCALED BETWEEN -1 and 1 #
# ------------------------------------------------------------------------------------------------ #
# Convert some results to numpy arrays
label = results_df["label"].to_numpy()
y = transpose(results_df["y"].to_numpy())
# Indices to beginning and ends of segments
indices = argwhere(~isnan(label))
# Initialize output
signal = zeros( (len(results_df),1) )
# Compute signal for all segments
for segment in range(1,len(indices)):
# Indices of current segments
start_index = indices[segment-1][0]
end_index = indices[segment][0]
# Determine through and peak value
yvalue_start = y[start_index]
yvalue_end = y[end_index]
# Determine through and peak values
if yvalue_start<yvalue_end:
trough_value = yvalue_start
peak_value = yvalue_end
else:
trough_value = yvalue_end
peak_value = yvalue_start
current_difference = peak_value-trough_value
# Inform user
print("Segment {} from index {} to {} with trough={} and peak={}".format(segment, start_index, end_index, trough_value, peak_value))
signal[start_index:(end_index+1), 0] = -1.0 + (2/current_difference) * (y[start_index:(end_index+1)]-trough_value)
fig, axs = plt.subplots(2, 1)
axs[0].plot(y)
axs[0].set_title('Original series')
axs[1].plot(signal)
axs[1].set_title('Converted signal')
plt.show()
Answered By - Hanno Reuvers
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.