import warnings
import math
import time
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error
from scipy import stats
warnings.filterwarnings("ignore")
titles_font_style = dict(family='Roboto Condensed', size=14, color='rgb(67, 67, 67)')
values_font_style = dict(family='Roboto Condensed', size=14, color='rgb(248, 248, 255)')
plot_background_color='rgb(243, 243, 243)'
paper_background_color='rgb(243, 243, 243)'
fill_color='rgba(255, 144, 14, 1)'
light_fill_color='rgba(255, 144, 14, 0.5)'
line_color='rgba(193, 104, 0, 1)'
line_color2='rgba(93, 164, 214, 1)'
line_color3='rgba(255, 0, 0, 1)'
grid_line_color='rgba(200, 200, 200, 0.5)'
rolling_factor = 20
Análisis de los tiempos de fabricación de un elemento, haciendo uso de modelos de autorregresión. Se emplea la metodología Box-Jenkins, considerando los siguientes bloques:
La serie está compuesta por un conjunto de observaciones ordenadas de la variable timesheet_incurred, que representa el tiempo de fabricación, en segundos, que se ha consumido en la ejecución de una determinada operación sobre un elemento.
Con una simple descriptiva se observa una gran amplitud en la distribución de los valores, sobre todo teniendo en cuenta la naturaleza de los mismos.
data = pd.read_json('example.json')
data = data.set_index(['index'])
data.describe().T
fig = go.Figure()
fig.add_trace(go.Histogram(x=data['timesheet_incurred'], name="Incurred Time", marker_color=fill_color, xbins=dict(start=0,size=100)))
fig.update_layout(
title="Incurred Time Freq",
xaxis=dict(
title='Time (seconds)',
showgrid=True,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
yaxis=dict(
title='Count',
showgrid=False,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
bargap=0.1)
fig.show()
fig = go.Figure()
fig.add_trace(go.Box(x=data['timesheet_incurred'], boxpoints='all', name='', jitter=0.5, whiskerwidth=0.2, fillcolor=light_fill_color, line_color=line_color, marker_size=4, line_width=1))
fig.update_layout(
xaxis=dict(
showgrid=True,
zerolinecolor=grid_line_color,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
showlegend=False)
fig.show()
En los gráficos de distribución se observa cláramente cómo la variable tiene valores muy dispersos, pero al mismo tiempo concentrados alrededor de los intervalos (0, 1000) y (5000, 7000).
fig = go.Figure()
fig.add_trace(go.Scatter(y=data['timesheet_incurred'], mode='lines', name="Incurred Time", line=dict(color=line_color, width=1)))
fig.add_trace(go.Scatter(y=data["timesheet_incurred"].rolling(window=rolling_factor).mean(), mode='lines', name=f"MA{rolling_factor}", line=dict(color=line_color2, width=1)))
fig.update_layout(
title="Incurred Time Values",
xaxis=dict(
title='Observations',
showgrid=True,
showline=False,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
yaxis=dict(
title='Value',
showgrid=False,
zerolinecolor=grid_line_color,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
)
fig.show()
En la gráfica de valores se observa una clara diferencia en la distribución de los mismos a partir de la observación 109. Esto puede ser debido a un cambio en el proceso de producción que haya afectado cláramente al tiempo de fabricación del elemento.
Como se trata del análisis de una serie temporal se tendrán en cuenta sólo aquellos que se observan con el nuevo comportamiento. En este caso la distribución de frecuencias queda como:
data_0, data = data[0:109], data[109:]
fig = go.Figure()
fig.add_trace(go.Histogram(x=data['timesheet_incurred'], name="Incurred Time", marker_color=fill_color, xbins=dict(start=0,size=100)))
fig.update_layout(
title="Incurred Time Freq",
xaxis=dict(
title='Time (seconds)',
showgrid=True,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
yaxis=dict(
title='Count',
showgrid=False,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
bargap=0.1)
fig.show()
fig = go.Figure()
fig.add_trace(go.Box(x=data['timesheet_incurred'], boxpoints='all', name='', jitter=0.5, whiskerwidth=0.2, fillcolor=light_fill_color, line_color=line_color, marker_size=4, line_width=1))
fig.update_layout(
xaxis=dict(
showgrid=True,
zerolinecolor=grid_line_color,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color)
fig.show()
fig = go.Figure()
fig.add_trace(go.Scatter(y=data['timesheet_incurred'], mode='lines', name="Incurred Time", line=dict(color=line_color, width=1)))
fig.add_trace(go.Scatter(y=data["timesheet_incurred"].rolling(window=rolling_factor).mean(), mode='lines', name=f"MA{rolling_factor}", line=dict(color=line_color2, width=1)))
fig.update_layout(
title="Incurred Time Values",
xaxis=dict(
title='Observations',
showgrid=True,
showline=False,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
yaxis=dict(
title='Value',
showgrid=False,
zerolinecolor=grid_line_color,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
)
fig.show()
Se observan algunos outliers pero la variable está más centrada. Antes de continuar con el estudio de los modelos ARIMA para esta serie se comprueba su normalidad:
stats.normaltest(data["timesheet_incurred"])
resultados que indican que los valores se ajustan adecuadamente a una normal. Además, de lleva a cabo el test aumentado de Dickey-Fuller para comprobar la estacionariedad de la serie.
Este test contrasta la hipótesis de NO estacionariedad frente a la alternativa de que si existe dicha condición.
adfuller_results = adfuller(data["timesheet_incurred"])
print(f'ADF Statistic: {adfuller_results[0]}')
print(f'p-value: {adfuller_results[1]}')
tal como muestran los resultados hay evidencias suficientes para rechazar la hipótesis de que la serie es No estacionaria.
Por último, el siguiente gráfico muestra la relación de orden uno en esta serie, es decir, cómo los valores de la misma se relacionan con el valor inmediatamente anterior (lag=1). En el gráfico se observan los valores outliers de antes, pero también que el resto de puntos se agrupan en una forma determinada. Este comportamiento también es indicativo de que la serie se puede estudiar mediante análisis autoregresivo.
fig = go.Figure()
fig.add_trace(go.Scatter(y=data['timesheet_incurred'], x=data['timesheet_incurred'].shift(1), mode='markers', name="Incurred Time", line=dict(color=line_color, width=1)))
fig.update_layout(
xaxis=dict(
title='y(t)',
showgrid=True,
showline=False,
zerolinecolor=grid_line_color,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
yaxis=dict(
title='y(t-1)',
showgrid=True,
showline=False,
zeroline=False,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
height=600,
width=600,)
fig.show()
Existen métodos para la estimación de los parámetros del modelo ARIMA basados en la aplicación de algunos tests, pero quizás el más extendido y fácil de usar es el que consiste en calcular y minimizar el error cuadrático medio obtenido de diferentes combinaciones de los parámetros p, q y d del modelo.
En nuestro ejemplo, probando diferentes valores, se obtiene que la combinación con menor MSE es (4,0,20)
def fit_the_model(data, order):
model = ARIMA(data, order=order).fit()
error = mean_squared_error(data, model.predict())
return model, error
p_values = [1, 2, 4, 6, 8, 10]
d_values = [0]
q_values = [0, 5, 10, 20]
best_sme, best_sme_test, best_order, best_model = float("inf"), float("inf"), None, None
train_size = len(data) - 20
start = time.time()
for p in p_values:
for d in d_values:
for q in q_values:
order = (p,d,q)
train, test = [x for x in data[0:train_size]['timesheet_incurred']], [x for x in data[train_size:]['timesheet_incurred']]
try:
predictions = list()
for t in range(len(test)):
model, mse = fit_the_model(train, order)
predictions.append(model.forecast()[0])
train.append(test[t])
mse_test = mean_squared_error(test, predictions)
print(f'ARIMA{order} AIC={model.aic} MSE={mse} MSETest={mse_test}')
if mse_test < best_sme_test:
best_sme, best_sme_test, best_order, best_model = mse, mse_test, order, model
except:
print(f'ARIMA{order} Error')
continue
elapsed_time = time.time() - start
if best_model:
print(f'Tiempo dedicado: {elapsed_time} sec.')
print(f'Mejor configuración: ARIMA{best_order} AIC={best_model.aic} MSE={best_sme} MSETest={best_sme_test}')
Para comprobar la validez del modelo se lleva a cabo el análisis de los resíduos para determinar si se trata de ruído blanco.
En primer lugar se aplica un test de normalidad sobre los residuos,
stats.normaltest(best_model.resid)
del cual se desprende que se rechaza la hipótesis de que sigan una distribución normal. Esto es indicativo de que no todo el comportamiento de la serie queda explicado por el modelo. Esto puede ser debido a que la serie sufre muchas anomalías, aunque existe correlación entre los valores.
En segundo lugar, se aplica el test de Durbin-Watson para contrastar la existencia de correlación en los resíduos del modelo,
sm.stats.durbin_watson(best_model.resid)
valor muy cercano a 2, lo cual indica la NO existencia de correlación en los resíduos, por lo que damos el modelo como válido.
Haciendo uso del modelo seleccionado se pueden hacer predicciones para los valores próximos de la serie, tal como se observa en el siguiente gráfico.
data['best_prediction'] = np.append([None],best_model.predict())
data["ma"]=data['timesheet_incurred'].rolling(window=rolling_factor).mean()
data["mstd"]=data['timesheet_incurred'].rolling(window=rolling_factor).std()
fig = go.Figure()
fig.add_trace(go.Scatter(y=data['timesheet_incurred'], mode='lines', name="Incurred Time", line=dict(color='gray', width=1)))
fig.add_trace(go.Scatter(y=data['ma'], mode='lines', name=f"MA{rolling_factor}", line=dict(color=line_color2, width=1)))
fig.add_trace(go.Scatter(y=data['best_prediction'][0:train_size], mode='lines', name="Incurred Time (Train Pred)", line=dict(color='rgba(200,0,0,0.5)', width=2)))
fig.add_trace(go.Scatter(y=np.append([None for x in range(0,train_size)], predictions), mode='lines', name="Incurred Time (Test Pred)", line=dict(color='rgba(200,0,0,0.9)', width=2)))
fig.add_trace(go.Scatter(y=np.append([None for x in range(0,len(data['timesheet_incurred']))], best_model.predict(start=len(data),end=len(data)+9,dynamic=True)), mode='lines', name="Forecast", line=dict(color='rgba(0,200,0,0.5)', width=2)))
fig.update_layout(
title="Incurred Time Values",
xaxis=dict(
title='Observations',
showgrid=True,
showline=False,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
yaxis=dict(
title='Value',
showgrid=False,
zerolinecolor=grid_line_color,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
)
fig.show()
Pero otro beneficio importante de este dipo de análisis sobre una serie temporal es que se puede usar para la detección de anomalías. Haciendo una simple normalización de la serie, se observan outliers dependiendo de la desviación estándard móvil, tal como se muestra en el siguiente gráfico.
data['normalized_timesheet_incurred']=(data['timesheet_incurred']-data['timesheet_incurred'].mean())/data['timesheet_incurred'].std()
data['normalized_ma']=data['normalized_timesheet_incurred'].rolling(window=rolling_factor).mean()
data['normalized_upper_std']=data['normalized_timesheet_incurred'].rolling(window=rolling_factor).mean() + 3 * data['normalized_timesheet_incurred'].rolling(window=rolling_factor).std()
data['normalized_lower_std']=data['normalized_timesheet_incurred'].rolling(window=rolling_factor).mean() - 3 * data['normalized_timesheet_incurred'].rolling(window=rolling_factor).std()
data['outlier']=data.apply(lambda x: x['normalized_timesheet_incurred'] if (x['normalized_timesheet_incurred'] > x['normalized_upper_std'] or x['normalized_timesheet_incurred'] < x['normalized_lower_std']) else None , axis=1)
fig = go.Figure()
fig.add_trace(go.Scatter(y=data['normalized_timesheet_incurred'], mode='lines', name="Incurred Time", line=dict(color=line_color, width=1)))
fig.add_trace(go.Scatter(y=data['normalized_ma'], mode='lines', name=f"MA{rolling_factor}", line=dict(color=line_color2, width=1)))
fig.add_trace(go.Scatter(y=data['normalized_upper_std'], mode='lines', name=f"MA{rolling_factor}+3std", line=dict(color=line_color2, width=1)))
fig.add_trace(go.Scatter(y=data['normalized_lower_std'], mode='lines', name=f"MA{rolling_factor}-3std", line=dict(color=line_color2, width=1)))
fig.add_trace(go.Scatter(y=data['outlier'], mode='markers', name=f"Outliers", marker=dict(color='rgba(255,0,0,0.5)', size=10)))
fig.update_layout(
title="Incurred Time Outliers (Normalized)",
xaxis=dict(
title='Normalized observations',
showgrid=True,
showline=False,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
yaxis=dict(
title='Value',
showgrid=False,
zerolinecolor=grid_line_color,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
)
fig.show()
Sin embargo, estudiando la desviación entre el valor observado y el valor predicho por el modelo, se pueden detectar anomalías de una manera más sensible que el método anterior. El siguiente gráfico señala anomalías detectadas en aquellos valores cuya desviación con respecto a lo estimado supera la desviación estándar móvil para dicho punto.
data['upper_std']=data['timesheet_incurred'].rolling(window=rolling_factor).mean() + 3 * data['timesheet_incurred'].rolling(window=rolling_factor).std()
data['lower_std']=data['timesheet_incurred'].rolling(window=rolling_factor).mean() - 3 * data['timesheet_incurred'].rolling(window=rolling_factor).std()
data['prediction_deviation']=data['timesheet_incurred']-data['best_prediction']
data['arima_anomaly']=data.apply(lambda x: x['timesheet_incurred'] if math.pow(x['prediction_deviation'],2) > math.pow(x['mstd'],2) else None , axis=1)
fig = go.Figure()
fig.add_trace(go.Scatter(y=data['timesheet_incurred'], mode='lines', name="Incurred Time", line=dict(color='gray', width=1)))
fig.add_trace(go.Scatter(y=data['ma'], mode='lines', name=f"MA{rolling_factor}", line=dict(color=line_color2, width=1)))
fig.add_trace(go.Scatter(y=data['upper_std'], mode='lines', name=f"MA{rolling_factor}+3std", line=dict(color=line_color2, width=1)))
fig.add_trace(go.Scatter(y=data['lower_std'], mode='lines', name=f"MA{rolling_factor}-3std", line=dict(color=line_color2, width=1)))
fig.add_trace(go.Scatter(y=data['best_prediction'], mode='lines', name="Incurred Time Prediction", line=dict(color='rgba(200,0,0,0.5)', width=2)))
fig.add_trace(go.Scatter(y=data['arima_anomaly'], mode='markers', name=f"Anomalies", marker=dict(color='rgba(255,0,0,0.5)', size=10)))
fig.update_layout(
title="Incurred Time Anomalies (ARIMA)",
xaxis=dict(
title='Observations',
showgrid=True,
showline=False,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
yaxis=dict(
title='Value',
showgrid=False,
zerolinecolor=grid_line_color,
linecolor=grid_line_color,
gridcolor=grid_line_color
),
paper_bgcolor=paper_background_color,
plot_bgcolor=plot_background_color,
)
fig.show()