Security violation in requirements.txt: line 5: package ‘Dhan_Trdaehull’ is not in the approved package list. Contact support to request a new package
this is showing when I try to add Dhan_tradehull in dependencies of Dhan DevPortal
Security violation in requirements.txt: line 5: package ‘Dhan_Trdaehull’ is not in the approved package list. Contact support to request a new package
this is showing when I try to add Dhan_tradehull in dependencies of Dhan DevPortal
Hi @dattaswaami ,
we are checking on it, we will update once solved.
If I try it without tradehull,
"""
Supertrend Strategy Live Algorithm for Dhan DevPortal
- Paper trading by default (logs trades)
- Uncomment order placement code to enable live trading
- Uses official dhanhq SDK v2+ with correct methods and constants
"""
import time
import logging
import csv
import json
import urllib.request
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
# ------------------------------------------------------------------
# Dhan SDK – correct import and client initialisation
# ------------------------------------------------------------------
from dhanhq import DhanContext, dhanhq
# ------------------------------------------------------------------
# HARD-CODED CREDENTIALS (DevPortal security: no os.getenv)
# ------------------------------------------------------------------
CLIENT_CODE = "your_client_code" # Replace with your actual client code
ACCESS_TOKEN = "your_access_token" # Replace with your actual access token
# ------------------------------------------------------------------
# CONFIGURATION
# ------------------------------------------------------------------
INSTRUMENT = "NIFTY"
STRIKE_STEP = 100
LOT_MULTIPLIER = 50
LOTS_PER_LEG = 1
ENTRY_ATR_LENGTH = 10
ENTRY_MULTIPLIER = 3
EXIT_ATR_LENGTH = 7
EXIT_MULTIPLIER = 2
START_TIME = "09:25:00"
LAST_ENTRY_TIME = "15:00:00"
FORCE_EXIT_START = "15:15:00"
FORCE_EXIT_END = "15:20:00"
OFFSETS_LONG_CALLS = [0, 1, 2, 3, 4, 5]
OFFSETS_SHORT_PUTS = [0, -1, -2, -3, -4, -5]
OFFSETS_LONG_PUTS = [0, -1, -2, -3, -4, -5]
OFFSETS_SHORT_CALLS = [0, 1, 2, 3, 4, 5]
BROKERAGE_PER_LOT_PER_ORDER = 20
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------
# TRADEBOOK LOGGER
# ------------------------------------------------------------------
class TradebookLogger:
def __init__(self, filename="tradebook.csv"):
self.filename = f"/tmp/{filename}"
self.header = [
"entry_time", "exit_time", "position_type",
"exit_reason", "atm_strike", "total_pnl",
"capital_after", "num_legs", "legs"
]
self._write_header_if_needed()
def _write_header_if_needed(self):
# Security: use try/except instead of os.path.exists
try:
with open(self.filename, 'r') as f:
first_line = f.readline().strip()
expected = ','.join(self.header)
if first_line != expected:
self._write_header()
except (FileNotFoundError, IOError):
self._write_header()
def _write_header(self):
try:
with open(self.filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(self.header)
except Exception as e:
logger.error(f"Failed to write tradebook header: {e}")
def log_trade(self, entry_time, exit_time, position_type, exit_reason, atm_strike, total_pnl, capital_after, legs):
legs_json = json.dumps(legs)
row = [
entry_time.isoformat(),
exit_time.isoformat(),
position_type,
exit_reason,
atm_strike,
round(total_pnl, 2),
round(capital_after, 2),
len(legs),
legs_json
]
try:
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow(row)
except Exception as e:
logger.error(f"Failed to log trade: {e}")
# ------------------------------------------------------------------
# SUPERTREND CALCULATION
# ------------------------------------------------------------------
def calculate_supertrend(df, atr_length=10, multiplier=3, prefix=''):
high = df['high']
low = df['low']
close = df['close']
atr = pd.Series(index=df.index, dtype=float)
for i in range(len(df)):
if i < atr_length:
atr.iloc[i] = 0
else:
tr = max(high.iloc[i] - low.iloc[i],
abs(high.iloc[i] - close.iloc[i-1]),
abs(low.iloc[i] - close.iloc[i-1]))
atr.iloc[i] = (atr.iloc[i-1] * (atr_length - 1) + tr) / atr_length
hl2 = (high + low) / 2
upper_band = hl2 + (multiplier * atr)
lower_band = hl2 - (multiplier * atr)
st = pd.Series(index=df.index, dtype=float)
direction = pd.Series(index=df.index, dtype=int)
for i in range(len(df)):
if i < atr_length:
st.iloc[i] = 0
direction.iloc[i] = 1
continue
if i == atr_length:
st.iloc[i] = upper_band.iloc[i] if close.iloc[i] > hl2.iloc[i] else lower_band.iloc[i]
direction.iloc[i] = 1 if close.iloc[i] > st.iloc[i] else -1
else:
prev_st = st.iloc[i-1]
prev_dir = direction.iloc[i-1]
if prev_dir == 1:
if close.iloc[i] > prev_st:
st.iloc[i] = max(lower_band.iloc[i], prev_st)
direction.iloc[i] = 1
else:
st.iloc[i] = upper_band.iloc[i]
direction.iloc[i] = -1
else:
if close.iloc[i] < prev_st:
st.iloc[i] = min(upper_band.iloc[i], prev_st)
direction.iloc[i] = -1
else:
st.iloc[i] = lower_band.iloc[i]
direction.iloc[i] = 1
df[f'{prefix}supertrend'] = st
df[f'{prefix}direction'] = direction
return df
# ------------------------------------------------------------------
# SCRIP MASTER LOADER AND SECURITY ID LOOKUP
# ------------------------------------------------------------------
def download_scrip_master():
"""Download Dhan scrip master CSV to /tmp and return DataFrame."""
url = "https://images.dhan.co/api-data/api-scrip-master.csv"
local_path = "/tmp/dhan_scrip_master.csv"
try:
urllib.request.urlretrieve(url, local_path)
df = pd.read_csv(local_path, low_memory=False)
logger.info("Scrip master downloaded successfully")
return df
except Exception as e:
logger.error(f"Failed to download scrip master: {e}")
raise
def get_security_id(scrip_df, expiry, strike, opt_type):
"""
Look up security_id for a given option contract.
expiry: datetime.date object
strike: int (strike price)
opt_type: "CE" or "PE"
"""
expiry_str = expiry.strftime("%Y-%m-%d")
mask = (
(scrip_df["SEM_INSTRUMENT_NAME"] == "OPTIDX") &
(scrip_df["SEM_EXM_EXCH_ID"] == "NSE") &
(scrip_df["SEM_EXPIRY_DATE"].str.startswith(expiry_str)) &
(scrip_df["SEM_STRIKE_PRICE"].astype(float) == float(strike)) &
(scrip_df["SEM_OPTION_TYPE"] == opt_type)
)
hits = scrip_df[mask]
if not hits.empty:
return str(int(hits.iloc[0]["SEM_SMST_SECURITY_ID"]))
return None
# ------------------------------------------------------------------
# MAIN TRADER CLASS
# ------------------------------------------------------------------
class SupertrendLiveTrader:
def __init__(self, client_code, access_token):
# Fix 1: Correct client initialisation
dhan_context = DhanContext(client_code, access_token)
self.dhan = dhanhq(dhan_context)
# Constants provided by SDK
self.NSE_FNO = self.dhan.NSE_FNO
self.BUY = self.dhan.BUY
self.SELL = self.dhan.SELL
self.MARKET = self.dhan.MARKET
self.INTRA = self.dhan.INTRA
self.instrument = INSTRUMENT
self.strike_step = STRIKE_STEP
self.position_open = False
self.position_type = None
self.entry_time = None
self.atm_strike = None
self.legs = [] # will hold: {'security_id':, 'side':, 'entry_price':, 'exit_price':, 'symbol': (for logging)}
self.trailing_stop = None
self.current_expiry = None
self.expiry_list = []
self.call_data = {} # keyed by security_id (string)
self.put_data = {}
self.initial_capital = 100000
self.current_capital = self.initial_capital
self.tradebook = TradebookLogger("tradebook.csv")
# Download scrip master and build lookup
self.scrip_df = download_scrip_master()
self._get_expiry_dates()
self.current_expiry = self._get_current_expiry()
if self.current_expiry is None:
raise ValueError("No valid expiry found.")
logger.info(f"Trading expiry: {self.current_expiry}")
self._fetch_initial_data(self.current_expiry)
def _get_expiry_dates(self):
# Fix 9: Correct modulo for same-day Thursday
today = datetime.now().date()
days_ahead = (3 - today.weekday()) % 7 # Thursday = 3
next_thu = today + timedelta(days=days_ahead)
self.expiry_list = [next_thu + timedelta(days=7*i) for i in range(4)]
logger.info(f"Expiry dates: {self.expiry_list}")
def _get_current_expiry(self):
if not self.expiry_list:
return None
today = datetime.now().date()
for exp in self.expiry_list:
if exp < today:
continue
if (exp - today).days <= 1:
continue
return exp
for exp in self.expiry_list:
if exp >= today:
return exp
return None
def _get_underlying_ltp(self):
# Fix 2: Use get_market_quote_ltp with correct security ID for NIFTY
# NIFTY security_id is "13" on NSE_INDEX
try:
resp = self.dhan.get_market_quote_ltp({"IDX_I": ["13"]})
if resp and resp.get("status") == "success":
ltp = float(resp["data"]["IDX_I"]["13"]["last_price"])
return ltp
except Exception as e:
logger.warning(f"Failed to fetch underlying LTP: {e}")
return 26000 # fallback
def _get_security_id(self, expiry, strike, opt_type):
return get_security_id(self.scrip_df, expiry, strike, opt_type)
def _build_option_symbol_string(self, expiry, strike, opt_type):
"""For logging purposes only – not used for API calls."""
exp_str = expiry.strftime("%d %b").upper().lstrip('0')
return f"{self.instrument} {exp_str} {strike} {opt_type}"
def _load_historical_data(self, security_id, expiry, strike, opt_type, days=10):
# Fix 3: Use correct args for intraday_minute_data
from_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d 09:00:00')
to_date = datetime.now().strftime('%Y-%m-%d 15:30:00')
try:
response = self.dhan.intraday_minute_data(
security_id=security_id,
exchange_segment=self.NSE_FNO,
instrument_type="OPTIDX",
interval="1",
from_date=from_date,
to_date=to_date
)
except Exception as e:
logger.error(f"Error fetching data for security_id {security_id}: {e}")
return None
# Fix 4: Parse columnar response
if not response or 'timestamp' not in response:
logger.warning(f"No data returned for security_id {security_id}")
return None
data = response
# timestamps are in seconds; convert to datetime
ts = pd.to_datetime(data["timestamp"], unit='s', utc=True).tz_convert('Asia/Kolkata')
df = pd.DataFrame({
"timestamp": ts,
"open": data["open"],
"high": data["high"],
"low": data["low"],
"close": data["close"],
"volume": data["volume"]
})
return df
def _fetch_initial_data(self, expiry_date):
# Determine ATM strike from underlying LTP
underlying_ltp = self._get_underlying_ltp()
atm_strike = int(round(underlying_ltp / self.strike_step) * self.strike_step)
# Build all needed strikes and fetch data
for off in set(OFFSETS_LONG_CALLS + OFFSETS_SHORT_CALLS):
strike = atm_strike + off * self.strike_step
sec_id = self._get_security_id(expiry_date, strike, "CE")
if sec_id:
df = self._load_historical_data(sec_id, expiry_date, strike, "CE")
if df is not None and len(df) > max(ENTRY_ATR_LENGTH, EXIT_ATR_LENGTH) + 5:
df = df.sort_values('timestamp').reset_index(drop=True)
df = calculate_supertrend(df, ENTRY_ATR_LENGTH, ENTRY_MULTIPLIER, 'entry_')
df = calculate_supertrend(df, EXIT_ATR_LENGTH, EXIT_MULTIPLIER, 'exit_')
self.call_data[sec_id] = df
logger.info(f"Loaded CE {strike} rows: {len(df)}")
else:
logger.warning(f"Could not load CE {strike}")
for off in set(OFFSETS_SHORT_PUTS + OFFSETS_LONG_PUTS):
strike = atm_strike + off * self.strike_step
sec_id = self._get_security_id(expiry_date, strike, "PE")
if sec_id:
df = self._load_historical_data(sec_id, expiry_date, strike, "PE")
if df is not None and len(df) > max(ENTRY_ATR_LENGTH, EXIT_ATR_LENGTH) + 5:
df = df.sort_values('timestamp').reset_index(drop=True)
df = calculate_supertrend(df, ENTRY_ATR_LENGTH, ENTRY_MULTIPLIER, 'entry_')
df = calculate_supertrend(df, EXIT_ATR_LENGTH, EXIT_MULTIPLIER, 'exit_')
self.put_data[sec_id] = df
logger.info(f"Loaded PE {strike} rows: {len(df)}")
else:
logger.warning(f"Could not load PE {strike}")
def _update_data(self):
# For each security_id in call_data, fetch latest minute and update
for sec_id, df in list(self.call_data.items()):
try:
# We need the underlying strike and expiry to rebuild symbol? We'll just use sec_id directly.
response = self.dhan.intraday_minute_data(
security_id=sec_id,
exchange_segment=self.NSE_FNO,
instrument_type="OPTIDX",
interval="1",
from_date=datetime.now().strftime('%Y-%m-%d 09:00:00'),
to_date=datetime.now().strftime('%Y-%m-%d 15:30:00')
)
if response and 'timestamp' in response:
data = response
ts = pd.to_datetime(data["timestamp"], unit='s', utc=True).tz_convert('Asia/Kolkata')
new_df = pd.DataFrame({
"timestamp": ts,
"open": data["open"],
"high": data["high"],
"low": data["low"],
"close": data["close"],
"volume": data["volume"]
})
new_df = new_df.sort_values('timestamp').reset_index(drop=True)
combined = pd.concat([df, new_df], ignore_index=True)
combined = combined.drop_duplicates(subset=['timestamp'], keep='last')
if len(combined) > 200:
combined = combined.tail(200)
combined = calculate_supertrend(combined, ENTRY_ATR_LENGTH, ENTRY_MULTIPLIER, 'entry_')
combined = calculate_supertrend(combined, EXIT_ATR_LENGTH, EXIT_MULTIPLIER, 'exit_')
self.call_data[sec_id] = combined
except Exception as e:
logger.warning(f"Update failed for {sec_id}: {e}")
for sec_id, df in list(self.put_data.items()):
try:
response = self.dhan.intraday_minute_data(
security_id=sec_id,
exchange_segment=self.NSE_FNO,
instrument_type="OPTIDX",
interval="1",
from_date=datetime.now().strftime('%Y-%m-%d 09:00:00'),
to_date=datetime.now().strftime('%Y-%m-%d 15:30:00')
)
if response and 'timestamp' in response:
data = response
ts = pd.to_datetime(data["timestamp"], unit='s', utc=True).tz_convert('Asia/Kolkata')
new_df = pd.DataFrame({
"timestamp": ts,
"open": data["open"],
"high": data["high"],
"low": data["low"],
"close": data["close"],
"volume": data["volume"]
})
new_df = new_df.sort_values('timestamp').reset_index(drop=True)
combined = pd.concat([df, new_df], ignore_index=True)
combined = combined.drop_duplicates(subset=['timestamp'], keep='last')
if len(combined) > 200:
combined = combined.tail(200)
combined = calculate_supertrend(combined, ENTRY_ATR_LENGTH, ENTRY_MULTIPLIER, 'entry_')
combined = calculate_supertrend(combined, EXIT_ATR_LENGTH, EXIT_MULTIPLIER, 'exit_')
self.put_data[sec_id] = combined
except Exception as e:
logger.warning(f"Update failed for {sec_id}: {e}")
def _build_legs(self, atm_strike, timestamp, is_bullish):
legs = []
expiry = self.current_expiry
if is_bullish:
# Buy calls: ATM to ATM+5
for off in OFFSETS_LONG_CALLS:
strike = atm_strike + off * self.strike_step
sec_id = self._get_security_id(expiry, strike, "CE")
if sec_id and sec_id in self.call_data:
df = self.call_data[sec_id]
row = df[df['timestamp'] == timestamp]
if len(row) > 0:
# Fix 6: entry price is close, not supertrend
entry = row.iloc[0]['close']
symbol = self._build_option_symbol_string(expiry, strike, "CE")
legs.append({
'security_id': sec_id,
'side': 'long',
'entry_price': entry,
'exit_price': None,
'symbol': symbol
})
# Sell puts: ATM to ATM-5
for off in OFFSETS_SHORT_PUTS:
strike = atm_strike + off * self.strike_step
sec_id = self._get_security_id(expiry, strike, "PE")
if sec_id and sec_id in self.put_data:
df = self.put_data[sec_id]
row = df[df['timestamp'] == timestamp]
if len(row) > 0:
entry = row.iloc[0]['close']
symbol = self._build_option_symbol_string(expiry, strike, "PE")
legs.append({
'security_id': sec_id,
'side': 'short',
'entry_price': entry,
'exit_price': None,
'symbol': symbol
})
else:
# Buy puts: ATM to ATM-5
for off in OFFSETS_LONG_PUTS:
strike = atm_strike + off * self.strike_step
sec_id = self._get_security_id(expiry, strike, "PE")
if sec_id and sec_id in self.put_data:
df = self.put_data[sec_id]
row = df[df['timestamp'] == timestamp]
if len(row) > 0:
entry = row.iloc[0]['close']
symbol = self._build_option_symbol_string(expiry, strike, "PE")
legs.append({
'security_id': sec_id,
'side': 'long',
'entry_price': entry,
'exit_price': None,
'symbol': symbol
})
# Sell calls: ATM to ATM+5
for off in OFFSETS_SHORT_CALLS:
strike = atm_strike + off * self.strike_step
sec_id = self._get_security_id(expiry, strike, "CE")
if sec_id and sec_id in self.call_data:
df = self.call_data[sec_id]
row = df[df['timestamp'] == timestamp]
if len(row) > 0:
entry = row.iloc[0]['close']
symbol = self._build_option_symbol_string(expiry, strike, "CE")
legs.append({
'security_id': sec_id,
'side': 'short',
'entry_price': entry,
'exit_price': None,
'symbol': symbol
})
return legs
def _check_entry(self):
# Find latest common timestamp across all security_ids
all_ts = set()
for df in self.call_data.values():
all_ts.update(df['timestamp'].tolist())
for df in self.put_data.values():
all_ts.update(df['timestamp'].tolist())
if not all_ts:
return None
latest_ts = max(all_ts)
now_time = latest_ts.time()
if now_time < datetime.strptime(START_TIME, "%H:%M:%S").time() or \
now_time > datetime.strptime(LAST_ENTRY_TIME, "%H:%M:%S").time():
return None
underlying_ltp = self._get_underlying_ltp()
atm_strike = int(round(underlying_ltp / self.strike_step) * self.strike_step)
# Check bullish entry on ATM call
sec_id = self._get_security_id(self.current_expiry, atm_strike, "CE")
if sec_id and sec_id in self.call_data:
df = self.call_data[sec_id]
idx = df[df['timestamp'] == latest_ts].index
if len(idx) > 0:
idx = idx[0]
if idx > 0:
row = df.iloc[idx]
prev = df.iloc[idx-1]
if row['close'] > row['entry_supertrend'] and prev['close'] < prev['entry_supertrend']:
legs = self._build_legs(atm_strike, latest_ts, is_bullish=True)
if legs:
return ('bullish', atm_strike, legs)
# Check bearish entry on ATM put
sec_id = self._get_security_id(self.current_expiry, atm_strike, "PE")
if sec_id and sec_id in self.put_data:
df = self.put_data[sec_id]
idx = df[df['timestamp'] == latest_ts].index
if len(idx) > 0:
idx = idx[0]
if idx > 0:
row = df.iloc[idx]
prev = df.iloc[idx-1]
if row['close'] > row['entry_supertrend'] and prev['close'] < prev['entry_supertrend']:
legs = self._build_legs(atm_strike, latest_ts, is_bullish=False)
if legs:
return ('bearish', atm_strike, legs)
return None
def _check_exit(self):
if not self.position_open or not self.legs:
return False
# Find latest common timestamp across all legs
all_ts = set()
for leg in self.legs:
sec_id = leg['security_id']
if sec_id in self.call_data:
all_ts.update(self.call_data[sec_id]['timestamp'].tolist())
elif sec_id in self.put_data:
all_ts.update(self.put_data[sec_id]['timestamp'].tolist())
if not all_ts:
return False
latest_ts = max(all_ts)
# Use the ATM leg for trailing stop (first long leg for bullish, first long for bearish)
atm_leg = None
for leg in self.legs:
if self.position_type == 'bullish' and leg['side'] == 'long' and 'CE' in leg['symbol']:
atm_leg = leg
break
elif self.position_type == 'bearish' and leg['side'] == 'long' and 'PE' in leg['symbol']:
atm_leg = leg
break
if atm_leg is None:
atm_leg = self.legs[0]
sec_id = atm_leg['security_id']
if sec_id in self.call_data:
df = self.call_data[sec_id]
elif sec_id in self.put_data:
df = self.put_data[sec_id]
else:
return False
row = df[df['timestamp'] == latest_ts]
if len(row) == 0:
return False
row = row.iloc[0]
exit_st = row['exit_supertrend']
if np.isnan(exit_st):
return False
# Update trailing stop
if self.trailing_stop is None:
self.trailing_stop = exit_st
else:
if self.position_type == 'bullish':
self.trailing_stop = max(self.trailing_stop, exit_st)
else:
self.trailing_stop = min(self.trailing_stop, exit_st)
# Exit if close < trailing stop
if row['close'] < self.trailing_stop:
return True
return False
def _enter_position(self, entry_type, atm_strike, legs):
self.position_open = True
self.position_type = entry_type
self.entry_time = datetime.now()
self.atm_strike = atm_strike
self.legs = legs
self.trailing_stop = None
logger.info(f"=== ENTER {entry_type.upper()} POSITION ===")
logger.info(f"ATM Strike: {atm_strike}")
for leg in legs:
logger.info(f" {leg['side'].upper()} {leg['symbol']} @ {leg['entry_price']:.2f}")
# Uncomment to enable live trading:
# for leg in legs:
# order = self.dhan.place_order(
# security_id=leg['security_id'],
# exchange_segment=self.NSE_FNO,
# transaction_type=self.BUY if leg['side'] == 'long' else self.SELL,
# quantity=LOTS_PER_LEG * LOT_MULTIPLIER,
# order_type=self.MARKET,
# product_type=self.INTRA,
# price=0
# )
# logger.info(f"Order placed: {order}")
def _exit_position(self, exit_reason="supertrend_stop"):
if not self.position_open:
return
exit_time = datetime.now()
total_pnl = 0
for leg in self.legs:
sec_id = leg['security_id']
if sec_id in self.call_data:
df = self.call_data[sec_id]
elif sec_id in self.put_data:
df = self.put_data[sec_id]
else:
continue
latest = df.iloc[-1]
# Fix 7: exit price is close, not supertrend
exit_price = float(latest['close'])
leg['exit_price'] = exit_price
logger.info(f" {leg['side'].upper()} {leg['symbol']} @ {exit_price:.2f}")
entry = leg['entry_price']
if leg['side'] == 'long':
pnl = (exit_price - entry) * LOTS_PER_LEG * LOT_MULTIPLIER
else:
pnl = (entry - exit_price) * LOTS_PER_LEG * LOT_MULTIPLIER
pnl -= BROKERAGE_PER_LOT_PER_ORDER * 2
total_pnl += pnl
self.current_capital += total_pnl
self.tradebook.log_trade(
entry_time=self.entry_time,
exit_time=exit_time,
position_type=self.position_type,
exit_reason=exit_reason,
atm_strike=self.atm_strike,
total_pnl=total_pnl,
capital_after=self.current_capital,
legs=self.legs
)
logger.info(f"Total P&L: ₹{total_pnl:.2f}")
logger.info(f"Capital after trade: ₹{self.current_capital:.2f}")
self.position_open = False
self.legs = []
self.trailing_stop = None
def run(self):
logger.info("Starting main loop...")
while True:
try:
now = datetime.now()
current_time = now.time()
market_open = datetime.strptime("09:15:00", "%H:%M:%S").time()
market_close = datetime.strptime("15:30:00", "%H:%M:%S").time()
if current_time < market_open or current_time > market_close:
if current_time < market_open:
target = datetime.combine(now.date(), datetime.strptime("09:15:00", "%H:%M:%S").time())
else:
tomorrow = now + timedelta(days=1)
target = datetime.combine(tomorrow.date(), datetime.strptime("09:15:00", "%H:%M:%S").time())
sleep_secs = (target - now).total_seconds()
if sleep_secs > 0:
logger.info(f"Market closed. Sleeping for {sleep_secs:.0f} seconds.")
while sleep_secs > 0:
time.sleep(min(60, sleep_secs))
sleep_secs -= 60
continue
self._update_data()
if not self.position_open:
entry = self._check_entry()
if entry:
entry_type, atm_strike, legs = entry
self._enter_position(entry_type, atm_strike, legs)
else:
if self._check_exit():
self._exit_position("supertrend_stop")
if current_time >= datetime.strptime(FORCE_EXIT_START, "%H:%M:%S").time() and \
current_time <= datetime.strptime(FORCE_EXIT_END, "%H:%M:%S").time():
if self.position_open:
logger.info("Forced exit during exit window.")
self._exit_position("forced_exit")
time.sleep(60)
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}")
time.sleep(10)
# ------------------------------------------------------------------
# ENTRY POINT
# ------------------------------------------------------------------
if __name__ == "__main__":
trader = SupertrendLiveTrader(CLIENT_CODE, ACCESS_TOKEN)
try:
trader.run()
except KeyboardInterrupt:
logger.info("Bot stopped by user.")
if trader.position_open:
trader._exit_position("manual_stop")
except Exception as e:
logger.error(f"Fatal error: {e}")
if trader.position_open:
trader._exit_position("error_exit")
raise
this is the error it is returning
›
[2026-06-18 11:35:26 IST] ==================== SCRIPT OUTPUT START ====================
›
[2026-06-18 11:35:29 IST] 2026-06-18 11:35:29,162 - INFO - Scrip master downloaded successfully
›
[2026-06-18 11:35:29 IST] 2026-06-18 11:35:29,162 - INFO - Expiry dates: [datetime.date(2026, 6, 18), datetime.date(2026, 6, 25), datetime.date(2026, 7, 2), datetime.date(2026, 7, 9)]
›
[2026-06-18 11:35:29 IST] 2026-06-18 11:35:29,162 - INFO - Trading expiry: 2026-06-25
›
[2026-06-18 11:35:29 IST] 2026-06-18 11:35:29,162 - WARNING - Failed to fetch underlying LTP: ‘dhanhq’ object has no attribute ‘get_market_quote_ltp’
›
can you help me with the code?
Hi @dattaswaami ,
Check this code -
"""
Supertrend Strategy Live Algorithm for Dhan Cloud.
Paper trading is enabled by default. Set PAPER_TRADING = False only after:
1. Dhan token is valid.
2. Data API plan is active.
3. Static IP is whitelisted for order placement from the cloud runtime.
"""
import time
import logging
import csv
import json
import random
import urllib.request
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
from dhanhq import DhanContext, dhanhq
# Dhan Cloud placeholders must stay quoted.
CLIENT_CODE = "{{CLIENT_ID}}"
ACCESS_TOKEN = "{{ACCESS_TOKEN}}"
# Configuration
INSTRUMENT = "NIFTY"
UNDERLYING_SECURITY_ID = "13" # NIFTY 50 index on IDX_I
STRIKE_STEP = 100
FALLBACK_LOT_SIZE = 50
LOTS_PER_LEG = 1
PAPER_TRADING = True
ENTRY_ATR_LENGTH = 10
ENTRY_MULTIPLIER = 3
EXIT_ATR_LENGTH = 7
EXIT_MULTIPLIER = 2
START_TIME = "09:25:00"
LAST_ENTRY_TIME = "15:00:00"
FORCE_EXIT_START = "15:15:00"
FORCE_EXIT_END = "15:20:00"
OFFSETS_LONG_CALLS = [0, 1, 2, 3, 4, 5]
OFFSETS_SHORT_PUTS = [0, -1, -2, -3, -4, -5]
OFFSETS_LONG_PUTS = [0, -1, -2, -3, -4, -5]
OFFSETS_SHORT_CALLS = [0, 1, 2, 3, 4, 5]
BROKERAGE_PER_LOT_PER_ORDER = 20
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class TradebookLogger:
def __init__(self, filename="tradebook.csv"):
self.filename = f"/tmp/{filename}"
self.header = [
"entry_time", "exit_time", "position_type",
"exit_reason", "atm_strike", "total_pnl",
"capital_after", "num_legs", "legs"
]
self._write_header_if_needed()
def _write_header_if_needed(self):
# Security: use try/except instead of os.path.exists
try:
with open(self.filename, 'r') as f:
first_line = f.readline().strip()
expected = ','.join(self.header)
if first_line != expected:
self._write_header()
except (FileNotFoundError, IOError):
self._write_header()
def _write_header(self):
try:
with open(self.filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(self.header)
except Exception as e:
logger.error(f"Failed to write tradebook header: {e}")
def log_trade(self, entry_time, exit_time, position_type, exit_reason, atm_strike, total_pnl, capital_after, legs):
legs_json = json.dumps(legs)
row = [
entry_time.isoformat(),
exit_time.isoformat(),
position_type,
exit_reason,
atm_strike,
round(total_pnl, 2),
round(capital_after, 2),
len(legs),
legs_json
]
try:
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow(row)
except Exception as e:
logger.error(f"Failed to log trade: {e}")
def calculate_supertrend(df, atr_length=10, multiplier=3, prefix=''):
high = df['high']
low = df['low']
close = df['close']
atr = pd.Series(index=df.index, dtype=float)
for i in range(len(df)):
if i < atr_length:
atr.iloc[i] = 0
else:
tr = max(high.iloc[i] - low.iloc[i],
abs(high.iloc[i] - close.iloc[i-1]),
abs(low.iloc[i] - close.iloc[i-1]))
atr.iloc[i] = (atr.iloc[i-1] * (atr_length - 1) + tr) / atr_length
hl2 = (high + low) / 2
upper_band = hl2 + (multiplier * atr)
lower_band = hl2 - (multiplier * atr)
st = pd.Series(index=df.index, dtype=float)
direction = pd.Series(index=df.index, dtype=int)
for i in range(len(df)):
if i < atr_length:
st.iloc[i] = 0
direction.iloc[i] = 1
continue
if i == atr_length:
st.iloc[i] = upper_band.iloc[i] if close.iloc[i] > hl2.iloc[i] else lower_band.iloc[i]
direction.iloc[i] = 1 if close.iloc[i] > st.iloc[i] else -1
else:
prev_st = st.iloc[i-1]
prev_dir = direction.iloc[i-1]
if prev_dir == 1:
if close.iloc[i] > prev_st:
st.iloc[i] = max(lower_band.iloc[i], prev_st)
direction.iloc[i] = 1
else:
st.iloc[i] = upper_band.iloc[i]
direction.iloc[i] = -1
else:
if close.iloc[i] < prev_st:
st.iloc[i] = min(upper_band.iloc[i], prev_st)
direction.iloc[i] = -1
else:
st.iloc[i] = lower_band.iloc[i]
direction.iloc[i] = 1
df[f'{prefix}supertrend'] = st
df[f'{prefix}direction'] = direction
return df
def download_scrip_master():
"""Download Dhan scrip master CSV to /tmp and return DataFrame."""
url = "https://images.dhan.co/api-data/api-scrip-master.csv"
local_path = "/tmp/dhan_scrip_master.csv"
try:
df = dhanhq.fetch_security_list("compact")
logger.info("Scrip master loaded through dhanhq SDK")
return df
except Exception as sdk_error:
logger.warning(f"SDK scrip master load failed, falling back to CSV URL: {sdk_error}")
try:
urllib.request.urlretrieve(url, local_path)
df = pd.read_csv(local_path, low_memory=False)
logger.info("Scrip master downloaded successfully")
return df
except Exception as e:
logger.error(f"Failed to download scrip master: {e}")
raise
def get_option_contract(scrip_df, expiry, strike, opt_type):
"""
Look up security_id and lot size for a given option contract.
expiry: datetime.date object
strike: int (strike price)
opt_type: "CE" or "PE"
"""
expiry_str = expiry.strftime("%Y-%m-%d")
mask = (
(scrip_df["SEM_INSTRUMENT_NAME"] == "OPTIDX") &
(scrip_df["SEM_EXM_EXCH_ID"] == "NSE") &
(scrip_df["SEM_CUSTOM_SYMBOL"].astype(str).str.upper() == INSTRUMENT) &
(scrip_df["SEM_EXPIRY_DATE"].astype(str).str.startswith(expiry_str)) &
(scrip_df["SEM_STRIKE_PRICE"].astype(float) == float(strike)) &
(scrip_df["SEM_OPTION_TYPE"] == opt_type)
)
hits = scrip_df[mask]
if not hits.empty:
row = hits.iloc[0]
lot_size = row.get("SEM_LOT_UNITS", FALLBACK_LOT_SIZE)
if pd.isna(lot_size):
lot_size = FALLBACK_LOT_SIZE
return {
"security_id": str(int(row["SEM_SMST_SECURITY_ID"])),
"lot_size": int(lot_size),
}
return None
def get_security_id(scrip_df, expiry, strike, opt_type):
contract = get_option_contract(scrip_df, expiry, strike, opt_type)
if contract:
return contract["security_id"]
return None
def unwrap_dhan_data(response, label):
"""Return response['data'] for current SDK responses, with a legacy fallback."""
if not response:
logger.warning(f"No response returned for {label}")
return None
if isinstance(response, dict) and response.get("status") == "failure":
logger.warning(f"Dhan API failure for {label}: {response.get('remarks') or response}")
return None
if isinstance(response, dict) and "data" in response:
return response["data"]
return response
def minute_response_to_df(response, label):
data = unwrap_dhan_data(response, label)
if not data or "timestamp" not in data:
logger.warning(f"No candle data returned for {label}: {response}")
return None
ts = pd.to_datetime(data["timestamp"], unit="s", utc=True).tz_convert("Asia/Kolkata")
return pd.DataFrame({
"timestamp": ts,
"open": data["open"],
"high": data["high"],
"low": data["low"],
"close": data["close"],
"volume": data["volume"],
})
class SupertrendLiveTrader:
def __init__(self, client_code, access_token):
dhan_context = DhanContext(client_code, access_token)
self.dhan = dhanhq(dhan_context)
self.NSE_FNO = dhanhq.NSE_FNO
self.BUY = dhanhq.BUY
self.SELL = dhanhq.SELL
self.MARKET = dhanhq.MARKET
self.INTRA = dhanhq.INTRA
self.DAY = dhanhq.DAY
self.instrument = INSTRUMENT
self.strike_step = STRIKE_STEP
self.position_open = False
self.position_type = None
self.entry_time = None
self.atm_strike = None
self.legs = [] # will hold: {'security_id':, 'side':, 'entry_price':, 'exit_price':, 'symbol': (for logging)}
self.trailing_stop = None
self.current_expiry = None
self.expiry_list = []
self.call_data = {} # keyed by security_id (string)
self.put_data = {}
self.initial_capital = 100000
self.current_capital = self.initial_capital
self.tradebook = TradebookLogger("tradebook.csv")
# Download scrip master and build lookup
self.scrip_df = download_scrip_master()
self._get_expiry_dates()
self.current_expiry = self._get_current_expiry()
if self.current_expiry is None:
raise ValueError("No valid expiry found.")
logger.info(f"Trading expiry: {self.current_expiry}")
self._fetch_initial_data(self.current_expiry)
def _get_expiry_dates(self):
today = datetime.now().date()
days_ahead = (3 - today.weekday()) % 7 # Thursday = 3
next_thu = today + timedelta(days=days_ahead)
self.expiry_list = [next_thu + timedelta(days=7*i) for i in range(4)]
logger.info(f"Expiry dates: {self.expiry_list}")
def _get_current_expiry(self):
if not self.expiry_list:
return None
today = datetime.now().date()
for exp in self.expiry_list:
if exp < today:
continue
if (exp - today).days <= 1:
continue
return exp
for exp in self.expiry_list:
if exp >= today:
return exp
return None
def _get_underlying_ltp(self):
try:
resp = self.dhan.ticker_data({"IDX_I": [int(UNDERLYING_SECURITY_ID)]})
data = unwrap_dhan_data(resp, "NIFTY ticker")
if data and "IDX_I" in data:
ltp = float(data["IDX_I"][UNDERLYING_SECURITY_ID]["last_price"])
return ltp
except Exception as e:
logger.warning(f"Failed to fetch underlying LTP: {e}")
return 26000 # fallback
def _get_security_id(self, expiry, strike, opt_type):
return get_security_id(self.scrip_df, expiry, strike, opt_type)
def _get_option_contract(self, expiry, strike, opt_type):
return get_option_contract(self.scrip_df, expiry, strike, opt_type)
def _build_option_symbol_string(self, expiry, strike, opt_type):
"""For logging only."""
exp_str = expiry.strftime("%d %b").upper().lstrip('0')
return f"{self.instrument} {exp_str} {strike} {opt_type}"
def _load_historical_data(self, security_id, expiry, strike, opt_type, days=10):
from_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d 09:00:00')
to_date = datetime.now().strftime('%Y-%m-%d 15:30:00')
try:
response = self.dhan.intraday_minute_data(
security_id=security_id,
exchange_segment=self.NSE_FNO,
instrument_type="OPTIDX",
interval=1,
from_date=from_date,
to_date=to_date
)
except Exception as e:
logger.error(f"Error fetching data for security_id {security_id}: {e}")
return None
return minute_response_to_df(response, f"{opt_type} {strike} {security_id}")
def _fetch_initial_data(self, expiry_date):
# Determine ATM strike from underlying LTP
underlying_ltp = self._get_underlying_ltp()
atm_strike = int(round(underlying_ltp / self.strike_step) * self.strike_step)
# Build all needed strikes and fetch data
for off in set(OFFSETS_LONG_CALLS + OFFSETS_SHORT_CALLS):
strike = atm_strike + off * self.strike_step
sec_id = self._get_security_id(expiry_date, strike, "CE")
if sec_id:
df = self._load_historical_data(sec_id, expiry_date, strike, "CE")
if df is not None and len(df) > max(ENTRY_ATR_LENGTH, EXIT_ATR_LENGTH) + 5:
df = df.sort_values('timestamp').reset_index(drop=True)
df = calculate_supertrend(df, ENTRY_ATR_LENGTH, ENTRY_MULTIPLIER, 'entry_')
df = calculate_supertrend(df, EXIT_ATR_LENGTH, EXIT_MULTIPLIER, 'exit_')
self.call_data[sec_id] = df
logger.info(f"Loaded CE {strike} rows: {len(df)}")
else:
logger.warning(f"Could not load CE {strike}")
for off in set(OFFSETS_SHORT_PUTS + OFFSETS_LONG_PUTS):
strike = atm_strike + off * self.strike_step
sec_id = self._get_security_id(expiry_date, strike, "PE")
if sec_id:
df = self._load_historical_data(sec_id, expiry_date, strike, "PE")
if df is not None and len(df) > max(ENTRY_ATR_LENGTH, EXIT_ATR_LENGTH) + 5:
df = df.sort_values('timestamp').reset_index(drop=True)
df = calculate_supertrend(df, ENTRY_ATR_LENGTH, ENTRY_MULTIPLIER, 'entry_')
df = calculate_supertrend(df, EXIT_ATR_LENGTH, EXIT_MULTIPLIER, 'exit_')
self.put_data[sec_id] = df
logger.info(f"Loaded PE {strike} rows: {len(df)}")
else:
logger.warning(f"Could not load PE {strike}")
def _refresh_option_data(self, option_data, label):
for sec_id, df in list(option_data.items()):
try:
response = self.dhan.intraday_minute_data(
security_id=sec_id,
exchange_segment=self.NSE_FNO,
instrument_type="OPTIDX",
interval=1,
from_date=datetime.now().strftime('%Y-%m-%d 09:00:00'),
to_date=datetime.now().strftime('%Y-%m-%d 15:30:00')
)
new_df = minute_response_to_df(response, f"{label} update {sec_id}")
if new_df is not None:
new_df = new_df.sort_values('timestamp').reset_index(drop=True)
combined = pd.concat([df, new_df], ignore_index=True)
combined = combined.drop_duplicates(subset=['timestamp'], keep='last')
if len(combined) > 200:
combined = combined.tail(200)
combined = calculate_supertrend(combined, ENTRY_ATR_LENGTH, ENTRY_MULTIPLIER, 'entry_')
combined = calculate_supertrend(combined, EXIT_ATR_LENGTH, EXIT_MULTIPLIER, 'exit_')
option_data[sec_id] = combined
except Exception as e:
logger.warning(f"Update failed for {sec_id}: {e}")
def _update_data(self):
self._refresh_option_data(self.call_data, "CE")
self._refresh_option_data(self.put_data, "PE")
def _build_legs(self, atm_strike, timestamp, is_bullish):
legs = []
expiry = self.current_expiry
if is_bullish:
# Buy calls: ATM to ATM+5
for off in OFFSETS_LONG_CALLS:
strike = atm_strike + off * self.strike_step
contract = self._get_option_contract(expiry, strike, "CE")
sec_id = contract["security_id"] if contract else None
if sec_id and sec_id in self.call_data:
df = self.call_data[sec_id]
row = df[df['timestamp'] == timestamp]
if len(row) > 0:
entry = row.iloc[0]['close']
symbol = self._build_option_symbol_string(expiry, strike, "CE")
legs.append({
'security_id': sec_id,
'side': 'long',
'entry_price': entry,
'exit_price': None,
'lot_size': contract["lot_size"],
'symbol': symbol
})
# Sell puts: ATM to ATM-5
for off in OFFSETS_SHORT_PUTS:
strike = atm_strike + off * self.strike_step
contract = self._get_option_contract(expiry, strike, "PE")
sec_id = contract["security_id"] if contract else None
if sec_id and sec_id in self.put_data:
df = self.put_data[sec_id]
row = df[df['timestamp'] == timestamp]
if len(row) > 0:
entry = row.iloc[0]['close']
symbol = self._build_option_symbol_string(expiry, strike, "PE")
legs.append({
'security_id': sec_id,
'side': 'short',
'entry_price': entry,
'exit_price': None,
'lot_size': contract["lot_size"],
'symbol': symbol
})
else:
# Buy puts: ATM to ATM-5
for off in OFFSETS_LONG_PUTS:
strike = atm_strike + off * self.strike_step
contract = self._get_option_contract(expiry, strike, "PE")
sec_id = contract["security_id"] if contract else None
if sec_id and sec_id in self.put_data:
df = self.put_data[sec_id]
row = df[df['timestamp'] == timestamp]
if len(row) > 0:
entry = row.iloc[0]['close']
symbol = self._build_option_symbol_string(expiry, strike, "PE")
legs.append({
'security_id': sec_id,
'side': 'long',
'entry_price': entry,
'exit_price': None,
'lot_size': contract["lot_size"],
'symbol': symbol
})
# Sell calls: ATM to ATM+5
for off in OFFSETS_SHORT_CALLS:
strike = atm_strike + off * self.strike_step
contract = self._get_option_contract(expiry, strike, "CE")
sec_id = contract["security_id"] if contract else None
if sec_id and sec_id in self.call_data:
df = self.call_data[sec_id]
row = df[df['timestamp'] == timestamp]
if len(row) > 0:
entry = row.iloc[0]['close']
symbol = self._build_option_symbol_string(expiry, strike, "CE")
legs.append({
'security_id': sec_id,
'side': 'short',
'entry_price': entry,
'exit_price': None,
'lot_size': contract["lot_size"],
'symbol': symbol
})
return legs
def _check_entry(self):
# Find latest common timestamp across all security_ids
all_ts = set()
for df in self.call_data.values():
all_ts.update(df['timestamp'].tolist())
for df in self.put_data.values():
all_ts.update(df['timestamp'].tolist())
if not all_ts:
return None
latest_ts = max(all_ts)
now_time = latest_ts.time()
if now_time < datetime.strptime(START_TIME, "%H:%M:%S").time() or \
now_time > datetime.strptime(LAST_ENTRY_TIME, "%H:%M:%S").time():
return None
underlying_ltp = self._get_underlying_ltp()
atm_strike = int(round(underlying_ltp / self.strike_step) * self.strike_step)
# Check bullish entry on ATM call
sec_id = self._get_security_id(self.current_expiry, atm_strike, "CE")
if sec_id and sec_id in self.call_data:
df = self.call_data[sec_id]
idx = df[df['timestamp'] == latest_ts].index
if len(idx) > 0:
idx = idx[0]
if idx > 0:
row = df.iloc[idx]
prev = df.iloc[idx-1]
if row['close'] > row['entry_supertrend'] and prev['close'] < prev['entry_supertrend']:
legs = self._build_legs(atm_strike, latest_ts, is_bullish=True)
if legs:
return ('bullish', atm_strike, legs)
# Check bearish entry on ATM put
sec_id = self._get_security_id(self.current_expiry, atm_strike, "PE")
if sec_id and sec_id in self.put_data:
df = self.put_data[sec_id]
idx = df[df['timestamp'] == latest_ts].index
if len(idx) > 0:
idx = idx[0]
if idx > 0:
row = df.iloc[idx]
prev = df.iloc[idx-1]
if row['close'] > row['entry_supertrend'] and prev['close'] < prev['entry_supertrend']:
legs = self._build_legs(atm_strike, latest_ts, is_bullish=False)
if legs:
return ('bearish', atm_strike, legs)
return None
def _check_exit(self):
if not self.position_open or not self.legs:
return False
# Find latest common timestamp across all legs
all_ts = set()
for leg in self.legs:
sec_id = leg['security_id']
if sec_id in self.call_data:
all_ts.update(self.call_data[sec_id]['timestamp'].tolist())
elif sec_id in self.put_data:
all_ts.update(self.put_data[sec_id]['timestamp'].tolist())
if not all_ts:
return False
latest_ts = max(all_ts)
# Use the ATM leg for trailing stop (first long leg for bullish, first long for bearish)
atm_leg = None
for leg in self.legs:
if self.position_type == 'bullish' and leg['side'] == 'long' and 'CE' in leg['symbol']:
atm_leg = leg
break
elif self.position_type == 'bearish' and leg['side'] == 'long' and 'PE' in leg['symbol']:
atm_leg = leg
break
if atm_leg is None:
atm_leg = self.legs[0]
sec_id = atm_leg['security_id']
if sec_id in self.call_data:
df = self.call_data[sec_id]
elif sec_id in self.put_data:
df = self.put_data[sec_id]
else:
return False
row = df[df['timestamp'] == latest_ts]
if len(row) == 0:
return False
row = row.iloc[0]
exit_st = row['exit_supertrend']
if np.isnan(exit_st):
return False
# Update trailing stop
if self.trailing_stop is None:
self.trailing_stop = exit_st
else:
if self.position_type == 'bullish':
self.trailing_stop = max(self.trailing_stop, exit_st)
else:
self.trailing_stop = min(self.trailing_stop, exit_st)
# Exit if close < trailing stop
if row['close'] < self.trailing_stop:
return True
return False
def _leg_quantity(self, leg):
return int(LOTS_PER_LEG * int(leg.get("lot_size", FALLBACK_LOT_SIZE)))
def _fake_order_id(self):
return str(random.randint(1000000000, 9999999999))
def _get_option_ltp(self, security_id):
try:
resp = self.dhan.ticker_data({"NSE_FNO": [int(security_id)]})
data = unwrap_dhan_data(resp, f"option ticker {security_id}")
if data and "NSE_FNO" in data and str(security_id) in data["NSE_FNO"]:
return float(data["NSE_FNO"][str(security_id)]["last_price"])
except Exception as e:
logger.warning(f"Failed to fetch option LTP for {security_id}: {e}")
return None
def _place_market_order(self, leg, transaction_type, reason):
if PAPER_TRADING:
ltp = self._get_option_ltp(leg["security_id"])
fallback_price = leg.get("entry_price") if reason == "entry" else leg.get("exit_price")
fill_price = ltp if ltp is not None else float(fallback_price or 0)
order_id = self._fake_order_id()
if reason == "entry":
leg["entry_price"] = fill_price
leg["entry_order_id"] = order_id
else:
leg["exit_price"] = fill_price
leg["exit_order_id"] = order_id
logger.info(
f"PAPER {reason}: order_id={order_id} {transaction_type} {leg['symbol']} "
f"qty={self._leg_quantity(leg)} ltp={fill_price:.2f} security_id={leg['security_id']}"
)
return {
"status": "success",
"remarks": "paper trade",
"data": {
"orderId": order_id,
"orderStatus": "TRADED",
"tradedPrice": fill_price,
"quantity": self._leg_quantity(leg),
},
}
response = self.dhan.place_order(
security_id=leg["security_id"],
exchange_segment=self.NSE_FNO,
transaction_type=transaction_type,
quantity=self._leg_quantity(leg),
order_type=self.MARKET,
product_type=self.INTRA,
price=0,
validity=self.DAY,
tag=f"st_{reason}_{datetime.now().strftime('%H%M%S')}",
)
if not response:
logger.error(f"Empty order response for {leg['symbol']}")
elif response.get("status") == "failure":
logger.error(f"Order failed for {leg['symbol']}: {response.get('remarks') or response}")
else:
logger.info(f"Order placed for {leg['symbol']}: {response}")
return response
def _enter_position(self, entry_type, atm_strike, legs):
self.position_open = True
self.position_type = entry_type
self.entry_time = datetime.now()
self.atm_strike = atm_strike
self.legs = legs
self.trailing_stop = None
logger.info(f"=== ENTER {entry_type.upper()} POSITION ===")
logger.info(f"ATM Strike: {atm_strike}")
for leg in legs:
transaction_type = self.BUY if leg['side'] == 'long' else self.SELL
self._place_market_order(leg, transaction_type, "entry")
logger.info(
f" {leg['side'].upper()} {leg['symbol']} "
f"qty={self._leg_quantity(leg)} @ {leg['entry_price']:.2f} "
f"order_id={leg.get('entry_order_id', '')}"
)
def _exit_position(self, exit_reason="supertrend_stop"):
if not self.position_open:
return
exit_time = datetime.now()
total_pnl = 0
for leg in self.legs:
sec_id = leg['security_id']
if sec_id in self.call_data:
df = self.call_data[sec_id]
elif sec_id in self.put_data:
df = self.put_data[sec_id]
else:
continue
latest = df.iloc[-1]
leg['exit_price'] = float(latest['close'])
entry = leg['entry_price']
if leg['side'] == 'long':
exit_transaction_type = self.SELL
else:
exit_transaction_type = self.BUY
self._place_market_order(leg, exit_transaction_type, "exit")
exit_price = float(leg["exit_price"])
logger.info(
f" {leg['side'].upper()} {leg['symbol']} @ {exit_price:.2f} "
f"order_id={leg.get('exit_order_id', '')}"
)
if leg['side'] == 'long':
pnl = (exit_price - entry) * self._leg_quantity(leg)
else:
pnl = (entry - exit_price) * self._leg_quantity(leg)
pnl -= BROKERAGE_PER_LOT_PER_ORDER * 2
total_pnl += pnl
self.current_capital += total_pnl
self.tradebook.log_trade(
entry_time=self.entry_time,
exit_time=exit_time,
position_type=self.position_type,
exit_reason=exit_reason,
atm_strike=self.atm_strike,
total_pnl=total_pnl,
capital_after=self.current_capital,
legs=self.legs
)
logger.info(f"Total P&L: Rs. {total_pnl:.2f}")
logger.info(f"Capital after trade: Rs. {self.current_capital:.2f}")
self.position_open = False
self.legs = []
self.trailing_stop = None
def run(self):
logger.info("Starting main loop...")
while True:
try:
now = datetime.now()
current_time = now.time()
market_open = datetime.strptime("09:15:00", "%H:%M:%S").time()
market_close = datetime.strptime("15:30:00", "%H:%M:%S").time()
if current_time < market_open or current_time > market_close:
if current_time < market_open:
target = datetime.combine(now.date(), datetime.strptime("09:15:00", "%H:%M:%S").time())
else:
tomorrow = now + timedelta(days=1)
target = datetime.combine(tomorrow.date(), datetime.strptime("09:15:00", "%H:%M:%S").time())
sleep_secs = (target - now).total_seconds()
if sleep_secs > 0:
logger.info(f"Market closed. Sleeping for {sleep_secs:.0f} seconds.")
while sleep_secs > 0:
time.sleep(min(60, sleep_secs))
sleep_secs -= 60
continue
self._update_data()
if not self.position_open:
entry = self._check_entry()
if entry:
entry_type, atm_strike, legs = entry
self._enter_position(entry_type, atm_strike, legs)
else:
if self._check_exit():
self._exit_position("supertrend_stop")
if current_time >= datetime.strptime(FORCE_EXIT_START, "%H:%M:%S").time() and \
current_time <= datetime.strptime(FORCE_EXIT_END, "%H:%M:%S").time():
if self.position_open:
logger.info("Forced exit during exit window.")
self._exit_position("forced_exit")
time.sleep(60)
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}")
time.sleep(10)
if __name__ == "__main__":
trader = SupertrendLiveTrader(CLIENT_CODE, ACCESS_TOKEN)
try:
trader.run()
except KeyboardInterrupt:
logger.info("Bot stopped by user.")
if trader.position_open:
trader._exit_position("manual_stop")
except Exception as e:
logger.error(f"Fatal error: {e}")
if trader.position_open:
trader._exit_position("error_exit")
raise