import asyncio
import json
import logging
from binance import AsyncClient, BinanceSocketManager
import pandas as pd

class DataConnector:
    def __init__(self, api_key=None, api_secret=None, testnet=True):
        self.client = None
        self.bsm = None
        self.api_key = api_key
        self.api_secret = api_secret
        self.testnet = testnet
        self.buffers = {}  # {symbol: pd.DataFrame}
        self.max_buffer_size = 200
        self.is_running = False

    async def start(self, symbols, timeframe):
        try:
            self.client = await AsyncClient.create(self.api_key, self.api_secret, testnet=self.testnet)
            self.bsm = BinanceSocketManager(self.client)
            self.is_running = True
            
            logging.info(f"Starting DataConnector for {symbols} on {timeframe}")
            
            # Initialize buffers with historical data
            for symbol in symbols:
                try:
                    klines = await self.client.get_klines(symbol=symbol, interval=timeframe, limit=self.max_buffer_size)
                    df = pd.DataFrame(klines, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av', 'ignore'])
                    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
                    for col in ['open', 'high', 'low', 'close', 'volume']:
                        df[col] = df[col].astype(float)
                    self.buffers[symbol] = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
                except Exception as e:
                    logging.error(f"Error fetching historical data for {symbol}: {e}")
                    self.buffers[symbol] = pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])

            # Start WebSocket streams
            tasks = [self.stream_handler(symbol, timeframe) for symbol in symbols]
            await asyncio.gather(*tasks)
        except Exception as e:
            logging.error(f"Fatal error in DataConnector: {e}")

    async def stream_handler(self, symbol, timeframe):
        retry_delay = 5
        while self.is_running:
            try:
                socket = self.bsm.kline_socket(symbol, interval=timeframe)
                async with socket as stream:
                    logging.info(f"WebSocket connected for {symbol}")
                    while self.is_running:
                        res = await asyncio.wait_for(stream.recv(), timeout=60)
                        if res and 'k' in res:
                            k = res['k']
                            new_row = {
                                'timestamp': pd.to_datetime(k['t'], unit='ms'),
                                'open': float(k['o']),
                                'high': float(k['h']),
                                'low': float(k['l']),
                                'close': float(k['c']),
                                'volume': float(k['v'])
                            }
                            df = self.buffers[symbol]
                            
                            if k['x']:  # Candle closed
                                df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
                                if len(df) > self.max_buffer_size:
                                    df = df.iloc[-self.max_buffer_size:]
                            else:
                                # Update current forming candle (last row) by replacing it
                                df = df.iloc[:-1]
                                df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
                                
                            self.buffers[symbol] = df
                            logging.debug(f"Buffer updated for {symbol}")
            except (asyncio.TimeoutError, Exception) as e:
                logging.warning(f"WebSocket error for {symbol}: {e}. Retrying in {retry_delay}s...")
                await asyncio.sleep(retry_delay)
                retry_delay = min(retry_delay * 2, 60) # Exponential backoff

    async def stop(self):
        self.is_running = False
        if self.client:
            await self.client.close_connection()
