Running backtest…
Fetching data and generating charts
Ascending Triangle
Pivot detection + triangle filter + breakout entries + ATR risk

Backtest Configuration

Tune triangle detection and breakout rules

Strategy Chart

Candles, pivots, triangle windows, breakout signals

Run backtest to see chart

ROI Performance

Equity curve over time

Initial
Net Profit
Return
Profit Factor
Max Drawdown
No ROI data yet

PnL Distribution

Histogram of trade outcomes

No PnL chart available

Summary Statistics

Total Trades
Long Trades
Short Trades
Win Rate
Avg Win %
Avg Loss %

Performance by Direction

Long
Trades:
Wins:
Losses:
Net PnL:
Avg Win %:
Short
Trades:
Wins:
Losses:
Net PnL:
Avg Win %:

Trade History

Closed positions

#SideEntry TimeExit Time EntryExitPnL ($)PnL (%)Exit Reason
No trades yet
return df.reset_index(drop=True) class PivotDetector: @staticmethod def detect_pivots(df: pd.DataFrame, left: int = 2, right: int = 2) -> pd.DataFrame: out = df.copy() n = len(out) swing_high = np.zeros(n, dtype=bool) swing_low = np.zeros(n, dtype=bool) highs = out["High"].values lows = out["Low"].values for i in range(left, n - right): current_high = highs[i] current_low = lows[i] left_highs = highs[i - left:i] right_highs = highs[i + 1:i + right + 1] left_lows = lows[i - left:i] right_lows = lows[i + 1:i + right + 1] if np.all(current_high > left_highs) and np.all(current_high > right_highs): swing_high[i] = True if np.all(current_low < left_lows) and np.all(current_low < right_lows): swing_low[i] = True out["swing_high"] = swing_high out["swing_low"] = swing_low return out @dataclass class Line: slope: float intercept: float def value_at(self, x: float) -> float: return self.slope * x + self.intercept class TriangleDetector: @staticmethod def fit_line(points): if len(points) < 2: return None x = np.array([p[0] for p in points], dtype=float) y = np.array([p[1] for p in points], dtype=float) m, b = np.polyfit(x, y, 1) return Line(slope=m, intercept=b) @staticmethod def normalized_slope(line: Line, ref_price: float) -> float: if ref_price == 0: return np.nan return line.slope / ref_price @classmethod def detect_ascending_triangle( cls, df: pd.DataFrame, n_pivots: int = 3, flat_high_tolerance: float = 0.0015, rising_low_min_slope: float = 0.0005, min_touches_high: int = 2, min_touches_low: int = 2, max_high_deviation: float = 0.004, ) -> pd.DataFrame: out = df.copy() out["triangle_ready"] = False out["resistance_line"] = np.nan out["support_line"] = np.nan out["high_line_slope"] = np.nan out["low_line_slope"] = np.nan out["pattern_start_idx"] = np.nan high_indices = out.index[out["swing_high"]].tolist() low_indices = out.index[out["swing_low"]].tolist() for i in range(len(out)): recent_high_idxs = [idx for idx in high_indices if idx <= i][-n_pivots:] recent_low_idxs = [idx for idx in low_indices if idx <= i][-n_pivots:] highs = [(idx, out.loc[idx, "High"]) for idx in recent_high_idxs] lows = [(idx, out.loc[idx, "Low"]) for idx in recent_low_idxs] if len(highs) < min_touches_high or len(lows) < min_touches_low: continue high_line = cls.fit_line(highs) low_line = cls.fit_line(lows) if high_line is None or low_line is None: continue avg_high = np.mean([p[1] for p in highs]) avg_low = np.mean([p[1] for p in lows]) high_slope_norm = cls.normalized_slope(high_line, avg_high) low_slope_norm = cls.normalized_slope(low_line, avg_low) highs_fitted = np.array([high_line.value_at(x) for x, _ in highs]) highs_actual = np.array([y for _, y in highs]) high_dev = np.max(np.abs(highs_actual - highs_fitted) / avg_high) if ( abs(high_slope_norm) <= flat_high_tolerance and low_slope_norm >= rising_low_min_slope and high_dev <= max_high_deviation ): resistance_now = high_line.value_at(i) support_now = low_line.value_at(i) if support_now < resistance_now: out.loc[i, "triangle_ready"] = True out.loc[i, "resistance_line"] = resistance_now out.loc[i, "support_line"] = support_now out.loc[i, "high_line_slope"] = high_slope_norm out.loc[i, "low_line_slope"] = low_slope_norm out.loc[i, "pattern_start_idx"] = min(highs[0][0], lows[0][0]) return out class BreakoutStrategy: @staticmethod def add_volume_zscore(df: pd.DataFrame, window: int = 20) -> pd.DataFrame: out = df.copy() vol_mean = out["Volume"].rolling(window).mean() vol_std = out["Volume"].rolling(window).std() out["Volume_Z"] = ((out["Volume"] - vol_mean) / vol_std).replace([np.inf, -np.inf], np.nan).fillna(0) return out @staticmethod def add_rsi(df: pd.DataFrame, period: int = 14) -> pd.DataFrame: out = df.copy() delta = out["Close"].diff() gain = delta.clip(lower=0).rolling(period).mean() loss = (-delta.clip(upper=0)).rolling(period).mean() rs = gain / loss.replace(0, np.nan) out["RSI"] = (100 - (100 / (1 + rs))).fillna(50) return out @classmethod def generate_signals( cls, df: pd.DataFrame, breakout_buffer: float = 0.001, volume_z_min: float = 0.0, confirm_close: bool = True, allow_shorts: bool = True, one_signal_per_pattern: bool = True, ) -> pd.DataFrame: out = df.copy() out = cls.add_volume_zscore(out) out = cls.add_rsi(out) long_signal = np.zeros(len(out), dtype=bool) short_signal = np.zeros(len(out), dtype=bool) used_pattern_start = set() for i in range(1, len(out)): if not out.loc[i, "triangle_ready"]: continue resistance = out.loc[i, "resistance_line"] support = out.loc[i, "support_line"] if np.isnan(resistance) or np.isnan(support): continue close = out.loc[i, "Close"] high = out.loc[i, "High"] low = out.loc[i, "Low"] vol_z = out.loc[i, "Volume_Z"] pattern_start = out.loc[i, "pattern_start_idx"] if one_signal_per_pattern and not np.isnan(pattern_start) and pattern_start in used_pattern_start: continue up_break = close > resistance * (1 + breakout_buffer) if confirm_close else high > resistance * (1 + breakout_buffer) dn_break = close < support * (1 - breakout_buffer) if confirm_close else low < support * (1 - breakout_buffer) if up_break and vol_z >= volume_z_min: long_signal[i] = True if one_signal_per_pattern and not np.isnan(pattern_start): used_pattern_start.add(pattern_start) elif allow_shorts and dn_break and vol_z >= volume_z_min: short_signal[i] = True if one_signal_per_pattern and not np.isnan(pattern_start): used_pattern_start.add(pattern_start) out["long_signal"] = long_signal out["short_signal"] = short_signal out["signal"] = np.where(long_signal, 1, np.where(short_signal, -1, 0)) return out class BacktestEngine: def __init__( self, df: pd.DataFrame, initial_capital: float = 10000, atr_period: int = 14, atr_mult_sl: float = 1.5, rr_mult_tp: float = 2.0, risk_fraction: float = 1.0, ): self.df = df.copy().reset_index(drop=True) self.initial_capital = initial_capital self.atr_period = atr_period self.atr_mult_sl = atr_mult_sl self.rr_mult_tp = rr_mult_tp self.risk_fraction = risk_fraction self.trades = [] self.equity_curve = [] self.final_capital = initial_capital self.total_return = 0.0 def add_atr(self): high = self.df["High"] low = self.df["Low"] close = self.df["Close"] tr1 = high - low tr2 = (high - close.shift()).abs() tr3 = (low - close.shift()).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) self.df["ATR"] = tr.rolling(self.atr_period).mean() self.df = self.df.dropna(subset=["ATR"]).reset_index(drop=True) def run(self): self.add_atr() capital = self.initial_capital position = None qty = 0.0 entry_price = np.nan entry_date = None sl_price = np.nan tp_price = np.nan for i in range(len(self.df)): row = self.df.loc[i] date = row["Date"] high = row["High"] low = row["Low"] close = row["Close"] atr = row["ATR"] signal = row["signal"] if position is not None: exit_price = None exit_reason = None if position == "long": sl_hit = low <= sl_price tp_hit = high >= tp_price if sl_hit and tp_hit: exit_price = sl_price exit_reason = "stop_loss_first_assumed" elif sl_hit: exit_price = sl_price exit_reason = "stop_loss" elif tp_hit: exit_price = tp_price exit_reason = "take_profit" if exit_price is not None: pnl = (exit_price - entry_price) * qty capital += pnl self.trades.append({ "entry_date": entry_date, "exit_date": date, "direction": "BUY", "entry_price": float(entry_price), "exit_price": float(exit_price), "qty": float(qty), "pnl": float(pnl), "pnl_pct": float((exit_price - entry_price) / entry_price * 100), "exit_reason": exit_reason, }) position = None qty = 0.0 elif position == "short": sl_hit = high >= sl_price tp_hit = low <= tp_price if sl_hit and tp_hit: exit_price = sl_price exit_reason = "stop_loss_first_assumed" elif sl_hit: exit_price = sl_price exit_reason = "stop_loss" elif tp_hit: exit_price = tp_price exit_reason = "take_profit" if exit_price is not None: pnl = (entry_price - exit_price) * qty capital += pnl self.trades.append({ "entry_date": entry_date, "exit_date": date, "direction": "SELL", "entry_price": float(entry_price), "exit_price": float(exit_price), "qty": float(qty), "pnl": float(pnl), "pnl_pct": float((entry_price - exit_price) / entry_price * 100), "exit_reason": exit_reason, }) position = None qty = 0.0 if position is None and signal != 0: risk_capital = capital * self.risk_fraction qty = risk_capital / close if signal == 1: position = "long" entry_price = close entry_date = date sl_price = entry_price - self.atr_mult_sl * atr tp_price = entry_price + self.atr_mult_sl * atr * self.rr_mult_tp elif signal == -1: position = "short" entry_price = close entry_date = date sl_price = entry_price + self.atr_mult_sl * atr tp_price = entry_price - self.atr_mult_sl * atr * self.rr_mult_tp if position == "long": equity = capital + (close - entry_price) * qty elif position == "short": equity = capital + (entry_price - close) * qty else: equity = capital self.equity_curve.append({ "Date": date, "Equity": float(equity), "Capital": float(capital), }) if position is not None: last = self.df.iloc[-1] last_close = last["Close"] last_date = last["Date"] if position == "long": pnl = (last_close - entry_price) * qty pnl_pct = (last_close - entry_price) / entry_price * 100 direction = "BUY" else: pnl = (entry_price - last_close) * qty pnl_pct = (entry_price - last_close) / entry_price * 100 direction = "SELL" capital += pnl self.trades.append({ "entry_date": entry_date, "exit_date": last_date, "direction": direction, "entry_price": float(entry_price), "exit_price": float(last_close), "qty": float(qty), "pnl": float(pnl), "pnl_pct": float(pnl_pct), "exit_reason": "end_of_data", }) self.final_capital = float(capital) self.total_return = float((capital - self.initial_capital) / self.initial_capital * 100) return self.get_stats() def _max_drawdown(self): eq = pd.DataFrame(self.equity_curve) if eq.empty: return 0.0 eq["peak"] = eq["Equity"].cummax() eq["drawdown"] = (eq["Equity"] - eq["peak"]) / eq["peak"] * 100 return float(eq["drawdown"].min()) def get_stats(self): trades_df = pd.DataFrame(self.trades) equity_df = pd.DataFrame(self.equity_curve) if trades_df.empty: return { "total_trades": 0, "total_long_trades": 0, "total_short_trades": 0, "winning_trades": 0, "losing_trades": 0, "win_rate": 0.0, "total_return_%": 0.0, "net_profit": 0.0, "final_capital": float(self.initial_capital), "avg_win_%": 0.0, "avg_loss_%": 0.0, "profit_factor": 0.0, "max_drawdown": 0.0, "trades": trades_df, "equity_curve": equity_df, } winning = trades_df[trades_df["pnl"] > 0] losing = trades_df[trades_df["pnl"] <= 0] return { "total_trades": int(len(trades_df)), "total_long_trades": int((trades_df["direction"] == "BUY").sum()), "total_short_trades": int((trades_df["direction"] == "SELL").sum()), "winning_trades": int(len(winning)), "losing_trades": int(len(losing)), "win_rate": float(len(winning) / len(trades_df) * 100) if len(trades_df) else 0.0, "total_return_%": float(self.total_return), "net_profit": float(self.final_capital - self.initial_capital), "final_capital": float(self.final_capital), "avg_win_%": float(winning["pnl_pct"].mean()) if len(winning) else 0.0, "avg_loss_%": float(losing["pnl_pct"].mean()) if len(losing) else 0.0, "profit_factor": float(winning["pnl"].sum() / abs(losing["pnl"].sum())) if len(losing) and losing["pnl"].sum() != 0 else 0.0, "max_drawdown": float(self._max_drawdown()), "trades": trades_df, "equity_curve": equity_df, } def plot_roi_graph(equity_curve_df, initial_capital, title="Cumulative PnL Over Time"): if equity_curve_df.empty: return None cum_pnl = equity_curve_df["Equity"] - initial_capital pos = cum_pnl.where(cum_pnl >= 0, 0) neg = cum_pnl.where(cum_pnl < 0, 0) fig = go.Figure() fig.add_trace(go.Scatter( x=equity_curve_df["Date"], y=pos, mode="lines", line=dict(color="#00ff9f", width=2.5), fill="tozeroy", fillcolor="rgba(0,255,159,0.2)", name="Profit ($)" )) fig.add_trace(go.Scatter( x=equity_curve_df["Date"], y=neg, mode="lines", line=dict(color="#ff4d4d", width=2.5), fill="tozeroy", fillcolor="rgba(255,77,77,0.2)", name="Loss ($)" )) fig.update_layout( title=title, plot_bgcolor="#0b0f1a", paper_bgcolor="#0b0f1a", font=dict(color="white"), xaxis=dict(title="Time", showgrid=True, gridcolor="rgba(255,255,255,0.1)"), yaxis=dict(title="Cumulative PnL ($)", showgrid=True, gridcolor="rgba(255,255,255,0.1)"), hovermode="x unified", height=400, ) return fig def plot_profitloss_graph(trades_df): if trades_df.empty: return None x_vals = [f"T{i+1}" for i in range(len(trades_df))] y_vals = trades_df["pnl"].values colors = ["#10b981" if p >= 0 else "#f43f5e" for p in y_vals] cum = trades_df["pnl"].cumsum() fig = go.Figure() fig.add_trace(go.Bar( x=x_vals, y=y_vals, marker_color=colors, text=[f"${p:.2f}" for p in y_vals], textposition="outside", name="Trade PnL ($)" )) fig.add_trace(go.Scatter( x=x_vals, y=cum, mode="lines+markers", name="Cumulative PnL", line=dict(color="#8b5cf6", width=2.5) )) fig.update_layout( title="Profit & Loss Distribution", plot_bgcolor="#0b0f1a", paper_bgcolor="#0b0f1a", font=dict(color="white"), xaxis=dict(title="Trade Number", showgrid=True, gridcolor="rgba(255,255,255,0.1)"), yaxis=dict(title="PnL ($)", showgrid=True, gridcolor="rgba(255,255,255,0.1)"), hovermode="x unified", height=400, ) return fig def plot_ascending_triangle_chart( df: pd.DataFrame, start_idx: int = 0, end_idx: int = -1, show_volume: bool = True, show_rsi: bool = True, title: str = "Ascending Triangle Breakout Strategy", ): if end_idx == -1: end_idx = len(df) - 1 data = df.iloc[start_idx:end_idx + 1].copy() if data.empty: raise ValueError("Selected chart slice is empty") n_rows = 1 subplot_titles = ["Price"] row_heights = [0.65] if show_volume: n_rows += 1 subplot_titles.append("Volume Z-Score") row_heights.append(0.17) if show_rsi: n_rows += 1 subplot_titles.append("RSI") row_heights.append(0.18) fig = make_subplots( rows=n_rows, cols=1, shared_xaxes=True, vertical_spacing=0.04, row_heights=row_heights, subplot_titles=subplot_titles, ) fig.add_trace( go.Candlestick( x=data["Date"], open=data["Open"], high=data["High"], low=data["Low"], close=data["Close"], name="Price", increasing_line_color="#00ff9f", decreasing_line_color="#ff4d4d", ), row=1, col=1, ) swing_highs = data[data["swing_high"]] swing_lows = data[data["swing_low"]] if not swing_highs.empty: fig.add_trace( go.Scatter( x=swing_highs["Date"], y=swing_highs["High"] * 1.001, mode="markers", name="Swing High", marker=dict(symbol="triangle-down", size=9, color="#ff4d4d"), ), row=1, col=1, ) if not swing_lows.empty: fig.add_trace( go.Scatter( x=swing_lows["Date"], y=swing_lows["Low"] * 0.999, mode="markers", name="Swing Low", marker=dict(symbol="triangle-up", size=9, color="#f59e0b"), ), row=1, col=1, ) tri = data[data["triangle_ready"]].copy() if not tri.empty: fig.add_trace( go.Scatter( x=tri["Date"], y=tri["resistance_line"], mode="lines", name="Projected Resistance", line=dict(color="#facc15", width=2, dash="dash"), ), row=1, col=1, ) fig.add_trace( go.Scatter( x=tri["Date"], y=tri["support_line"], mode="lines", name="Projected Support", line=dict(color="#38bdf8", width=2, dash="dash"), ), row=1, col=1, ) long_signals = data[data["long_signal"]] short_signals = data[data["short_signal"]] if not long_signals.empty: fig.add_trace( go.Scatter( x=long_signals["Date"], y=long_signals["Close"], mode="markers", name="Long Breakout", marker=dict(symbol="triangle-up", size=12, color="#00ff9f", line=dict(color="white", width=1)), ), row=1, col=1, ) if not short_signals.empty: fig.add_trace( go.Scatter( x=short_signals["Date"], y=short_signals["Close"], mode="markers", name="Short Breakout", marker=dict(symbol="triangle-down", size=12, color="#ff4d4d", line=dict(color="white", width=1)), ), row=1, col=1, ) current_row = 2 if show_volume: fig.add_trace( go.Bar( x=data["Date"], y=data["Volume_Z"], name="Volume Z", marker_color=["#00ff9f" if z >= 0 else "#ff4d4d" for z in data["Volume_Z"]], opacity=0.7, ), row=current_row, col=1, ) fig.add_hline(y=0, line_dash="dash", line_color="gray", row=current_row, col=1) current_row += 1 if show_rsi: fig.add_trace( go.Scatter( x=data["Date"], y=data["RSI"], mode="lines", name="RSI", line=dict(color="#8b5cf6", width=2), ), row=current_row, col=1, ) fig.add_hline(y=70, line_dash="dash", line_color="red", row=current_row, col=1) fig.add_hline(y=30, line_dash="dash", line_color="green", row=current_row, col=1) fig.add_hline(y=50, line_dash="dot", line_color="gray", row=current_row, col=1) fig.update_yaxes(range=[0, 100], row=current_row, col=1) fig.update_layout( title=title, plot_bgcolor="#0b0f1a", paper_bgcolor="#0b0f1a", font=dict(color="white", family="Inter"), legend=dict( bgcolor="rgba(0,0,0,0.4)", orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0, ), hovermode="x unified", height=900, width=1300, ) fig.update_xaxes(rangeslider_visible=False, showgrid=True, gridcolor="rgba(255,255,255,0.1)") fig.update_yaxes(showgrid=True, gridcolor="rgba(255,255,255,0.1)") return fig def run_ascending_triangle_strategy( symbol="BTCUSD", exchange="BITSTAMP", interval="1h", n_bars=1500, start_date=None, end_date=None, initial_capital=10000, pivot_window=2, n_pivots=3, flat_high_tolerance=0.0015, rising_low_min_slope=0.0005, breakout_buffer=0.001, volume_z_min=0.0, allow_shorts=True, atr_mult_sl=1.5, rr_mult_tp=2.0, ): fetcher = DataFetcher( symbol=symbol, exchange=exchange, interval=interval, n_bars=n_bars, ) df = fetcher.fetch(start_date=start_date, end_date=end_date) df = PivotDetector.detect_pivots(df, left=pivot_window, right=pivot_window) df = TriangleDetector.detect_ascending_triangle( df, n_pivots=n_pivots, flat_high_tolerance=flat_high_tolerance, rising_low_min_slope=rising_low_min_slope, min_touches_high=2, min_touches_low=2, max_high_deviation=0.004, ) df = BreakoutStrategy.generate_signals( df, breakout_buffer=breakout_buffer, volume_z_min=volume_z_min, confirm_close=True, allow_shorts=allow_shorts, one_signal_per_pattern=True, ) engine = BacktestEngine( df=df, initial_capital=initial_capital, atr_mult_sl=atr_mult_sl, rr_mult_tp=rr_mult_tp, risk_fraction=1.0, ) stats = engine.run() return df, stats