import logging
logging.getLogger('telethon').setLevel(logging.CRITICAL)

# functions.py
from telethon.tl.types import JsonObject, JsonObjectValue, JsonString, JsonNumber, Message
from telethon import TelegramClient, functions, types, errors

import zipfile
import asyncio
import random
import httpx
import glob
import re
import os
from typing import Union, Optional
from utils.database import Settings
from utils.logger import logger
from dataclasses import dataclass, field
from collections import deque

GMAIL_SERVICE: str = "1"
GMAIL_API_KEY: str = "540916392:cVkqKkXSQCOBsbDYTgVT"
CHUNK: int = 25
# gmail_semp = asyncio.Semaphore(value=1)

@dataclass
class Gmail:
    address: str
    request_id: str
    issucess: bool
    used: int = field(default=0)
    code: str = field(default=None)

class EmailKeeper:
    _queue: asyncio.Queue = asyncio.Queue()

    @classmethod
    async def add_gmail(cls, gmail: "Gmail") -> None:
        if gmail.used < 2:
            await cls._queue.put(gmail)
    
    @classmethod
    async def get_gmail_async(cls, timeout: float = 0.0) -> Optional["Gmail"]:
        try:
            if timeout > 0:
                gmail = await asyncio.wait_for(cls._queue.get(), timeout=timeout)
            else:
                gmail = cls._queue.get_nowait()
        except (asyncio.QueueEmpty, asyncio.TimeoutError):
            return None

        return gmail
    
# class EmailKeeper:
#     emails = deque()

#     @classmethod
#     def add_gmail(cls, gmail: Gmail) -> None:
#         if gmail.used < 2:
#             cls.emails.appendleft(gmail)
    
#     @classmethod
#     def get_gmail_sync(cls) -> Union[Gmail, None]:
#         if len(cls.emails) == 0:
#             return None
        
#         _email: Gmail = cls.emails.popleft()
#         return _email
    
#     @classmethod
#     async def get_gmail_async(cls) -> Union[Gmail, None]:
#         async with gmail_semp:
#             if len(cls.emails) == 0:
#                 return None
            
#             _email: Gmail = cls.emails.popleft()
#             await asyncio.sleep(0.1)
#             return _email



def chunk_generate(input_list, chunk_size):
    '''List Chunk Generate'''
    for i in range(0, len(input_list), chunk_size):
        yield input_list[i:i + chunk_size]

def convert_to_sticker(status: bool) -> str:
    return '✅' if status else '❌'


def convert_status(status: bool) -> str:
    return '🔴️️️️️️ Disable' if status else '🟢️️️️️️ Enable'


def get_sessions() -> list:
    sessions = glob.glob('sessions/*.session')
    sessions.sort(key=lambda x: os.path.getmtime(x))
    return sessions


def get_proxy() -> Union[str, None]:
    try:
        with open('utils/proxies.txt', 'r', encoding='utf-8') as f:
            proxies = [line.strip() for line in f if line.strip() and not line.strip().startswith('#')]
        return random.choice(proxies) if proxies else None
    except FileNotFoundError:
        logger.warning('utils/proxies.txt not found. Continuing without proxies.')
        return None

        
def parse_proxy_line(line: Union[str, None]) -> Union[dict, None]:
    if line:
        parts = line.split(':')
        try:
            if len(parts) == 2:
                addr, port = parts
                return {'proxy_type': 'socks5', 'addr': addr, 'port': int(port)}
            elif len(parts) >= 4:
                return {'proxy_type': 'socks5', 'addr': parts[0], 'port': int(parts[1]), 'username': parts[2], 'password': parts[3]}
            else:
                return None
        except Exception:
            return None
    return None


def get_push_token() -> Union[str, None]:
    try:
        with open('utils/push_tokens.txt', 'r', encoding='utf-8') as f:
            tokens = [l.strip() for l in f if l.strip() and not l.startswith('#')]
        return random.choice(tokens) if tokens else None
    except FileNotFoundError:
        logger.warning('utils/push_tokens.txt not found.')
        return None


def get_random_system_version() -> str:
    return random.choice(['SDK 31', 'SDK 32', 'SDK 33', 'SDK 34'])


def get_random_app_version() -> str:
    return random.choice(['11.6.2 (56152)', '12.0.1 (6166)', '12.1.1 (6211)'])


def get_random_device_model() -> str:
    try:
        with open('utils/device.txt', 'r', encoding='utf-8') as f:
            devices = [line.strip() for line in f.readlines() if line.strip() and not line.startswith('#')]
        return random.choice(devices).strip() if devices else 'Pixel 5'
    except FileNotFoundError:
        return 'Pixel 5'


async def patch_client(client: TelegramClient) -> None:
    try:
        client._init_request.lang_pack = 'android'
        client._init_request.params = JsonObject([
            JsonObjectValue('device_token', JsonString(get_push_token())),
            JsonObjectValue('data', JsonString('49C1522548EBACD46CE322B6FD47F6092BB745D0F88082145CAF35E14DCC38E1')),
            JsonObjectValue('installer', JsonString('com.android.vending')),
            JsonObjectValue('package_id', JsonString('org.telegram.messenger.web')),
            JsonObjectValue('tz_offset', JsonNumber(-3600)),
            JsonObjectValue('perf_cat', JsonNumber(2)),
        ])
    except Exception as e:
        logger.debug(f'[patch_client] warning: couldn\'t patch init_request: {e}')


async def create_client_for_session(proxy: Union[dict, None], connect_timeout: int = 20) -> Union[TelegramClient, None]:
    try:
        session_path = random.choice(get_sessions())
        
        telegram = TelegramClient(
            session=session_path,
            api_id=6,
            api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e',
            device_model='samsungSM-A125F', # get_random_device_model()
            system_version='SDK 33', # get_random_system_version()
            app_version='11.6.2 (56152)', # get_random_app_version()
            system_lang_code='fr-CA',
            lang_code='fr',
            proxy=proxy,
            connection_retries=1,
            receive_updates=False,
            timeout=connect_timeout,
            auto_reconnect=False,
        )
        
        await patch_client(telegram)
        await asyncio.wait_for(telegram.connect(), timeout=connect_timeout)
        
        return telegram, session_path

    except asyncio.TimeoutError:
        logger.error(f'[create_client] Timeout connecting session {os.path.basename(session_path)} (proxy may be slow).')
        
        try:
            await telegram.disconnect()
        except Exception:
            pass
        
        return None
    
    except Exception as error:
        logger.error(f'[create_client] Error creating client for {os.path.basename(session_path)}: {error}')
        
        try:
            await telegram.disconnect()
        except Exception:
            pass
        
        return None


async def get_gmail(api_key: str, server: str) ->  Union[Gmail, None]:
    url: str = f"https://venusads.ir/api/V1/email/getEmail/?key={api_key}&server={server}"
    content: str = None

    try:
        async with httpx.AsyncClient(timeout=25, verify=False) as client:
            response = await client.get(url)
            content: str = response.text
            response_json: dict = response.json()

            response.raise_for_status()
            if not isinstance(response_json, dict): response_json = {}
            if response_json.get('status') != 200 :return None

            email: Union[str, None] = response_json.get('email', None)
            request_id: Union[str, None] = response_json.get('requestID', None)

            return Gmail(
                address=email,
                request_id=request_id,
                issucess= True if email != None and request_id != None else False
            )
            
    except Exception as e:
        logger.error(f"Gmail | get gmail error : {content} | {e}")
        return None

async def get_gmail_code(api_key: str, request_id: str) -> Union[str, None]:
    url: str = f"https://venusads.ir/api/V1/email/getCode/?key={api_key}&id={request_id}"
    content: str = None

    try:
        async with httpx.AsyncClient(timeout=25, verify=False) as client:
            response = await client.get(url)
            content: str = response.text
            response_json: dict = response.json()

            ##
            ## sample : {"status":200,"message":"Success","code":"202830","requestID":"1769108678536779"}
            ##
            telegram_code: str = response_json.get('code')

            return telegram_code
        
    except Exception as e:
        logger.error(f"Gmail | get gmail code error : {content} | {e}")
        return None

async def extract_login_code(api_url: str) -> Union[str, bool]:
    try:
        async with httpx.AsyncClient(follow_redirects=True, timeout=10, verify=False) as client:
            response = await client.get(api_url)
            match = re.search(r'(\d{5})', response.text)
            return match.group(1) if match else False
    except Exception:
        return False

async def _reset_number(account: str) -> tuple:
    telegram = None
    try:
        logger.info('------------------------------------------------')

        # ----------------- Proxy ----------------- #
        proxy = parse_proxy_line(get_proxy())
        proxy_info = ':'.join(str(v) for v in proxy.values()) if proxy else 'no-proxy'

        # ----------------- Create Client ----------------- #
        client_result = await create_client_for_session(proxy=proxy, connect_timeout=60)
        if client_result is None:
            logger.error('[reset_number] Error in create_client_for_session')
            return (False, 'unknow', account)

        telegram, session_path = client_result

        # ----------------- Parse Input ----------------- #
        if '|' in account:
            parts = account.split('|')
        elif '-' in account:
            parts = account.split('-')
        else:
            logger.error(f"[reset_number] Invalid account format: {account}")
            return (False, 'invalid-account-format', account)

        if len(parts) < 2:
            logger.error(f"[reset_number] Account data incomplete: {account}")
            return (False, 'invalid-account-format', account)

        phone_number, api_url = parts[0], parts[1]
        verification_hash = None

        # ----------------- Send Code Request ----------------- #
        try:
            response = await telegram.send_code_request(phone_number)
        except Exception as send_error:
            logger.error(f"[reset_number] Failed to send code request: {send_error}")
            return (False, 'unsuccess-send-code', account)

        logger.info(f"<green>[+]</green> Proxy: {proxy_info}")
        logger.info(f'<green>[+]</green> Phone number: <yellow>{phone_number}</yellow>')
        logger.info(f'<green>[+]</green> Session path: <yellow>{session_path}</yellow>')
        logger.info(f'<green>[+]</green> Code request sent, <yellow>{getattr(response, "type", "unknown")}</yellow>')

        # =====================================================================================
        #   HANDLE SENT CODE TYPES
        # =====================================================================================

        # -------------------------------------------------------------------------------------
        # EMAIL CODE (login via email)
        # -------------------------------------------------------------------------------------
        if isinstance(response.type, types.auth.SentCodeTypeEmailCode):
            logger.info(f'<green>[+]</green> Email verification required: {response.type.email_pattern}')

            try:
                reset_response = await telegram(
                    functions.auth.ResetLoginEmailRequest(
                        phone_number=phone_number,
                        phone_code_hash=response.phone_code_hash
                    )
                )
                pending = getattr(reset_response.type, 'reset_pending_date', None)
                logger.info(f'<green>[+]</green> Email reset sent. Pending: {pending}')
                return (True, 'success-reset-email' if pending else 'unsuccess-reset-2fa', account)

            except Exception as email_error:
                logger.error(f'<red>[-]</red> Email reset error: {email_error}')
                return (False, 'unsuccess-reset-email', account)

        # -------------------------------------------------------------------------------------
        # SMS CODE (normal)
        # -------------------------------------------------------------------------------------
        elif isinstance(response.type, types.auth.SentCodeTypeSms):
            logger.info("<green>[+]</green> SentCodeTypeSms → Continue normally.")

            verification_hash = response.phone_code_hash

        # -------------------------------------------------------------------------------------
        # APP CODE (force SMS) — MODE A (requested)
        # -------------------------------------------------------------------------------------
        elif isinstance(response.type, types.auth.SentCodeTypeApp):
            logger.info("<green>[+]</green> SentCodeTypeApp → Forcing SMS request...")

            try:
                resend = await telegram(
                    functions.auth.ResendCodeRequest(
                        phone_number=phone_number,
                        phone_code_hash=response.phone_code_hash
                    )
                )
                verification_hash = resend.phone_code_hash
                logger.info("<green>[+]</green> SMS resend successfully forced.")
            except Exception as resend_error:
                logger.error(f"<red>[-]</red> Failed to force SMS resend: {resend_error}")
                return (False, 'unsuccess-sms', account)

        # -------------------------------------------------------------------------------------
        # SetUp Email
        # -------------------------------------------------------------------------------------
        elif isinstance(response.type, types.auth.SentCodeTypeSetUpEmailRequired):
            logger.info("<green>[+]</green> SentCode SetUp Email...")

            try:
                gmail: Union[Gmail, None] = await EmailKeeper.get_gmail_async(timeout=1)
                if isinstance(gmail, Gmail):
                    if gmail.used >= 2:
                        gmail = None
                
                if gmail == None:
                    for retrys in range(3):
                        gmail = await get_gmail(GMAIL_API_KEY, GMAIL_SERVICE)
                        if gmail != None:
                            if gmail.issucess and gmail.address != None:
                                break

                        logger.info(f"{phone_number} | Get Email Retry [{retrys}]...")
                        await asyncio.sleep(1)
                
                logger.info(f'{phone_number} | {gmail}')
                if gmail == None:
                    logger.error(f"{phone_number} | <red>[-]</red> Failed to get gmail")
                    return (False, 'unsuccess-setup-email', account)
            
                sent_code_mail = await telegram(
                    functions.account.SendVerifyEmailCodeRequest(
                        purpose = types.EmailVerifyPurposeLoginSetup(
                            phone_number=phone_number,
                            phone_code_hash=response.phone_code_hash
                        ),
                        email=gmail.address
                    )
                )

                logger.info(f'{phone_number} | Send Verify Email Code Request : {sent_code_mail}')

                # if gmail.used > 1:
                #     logger.info(f"{phone_number} | {gmail.address} | wait longer for second code!")
                #     await asyncio.sleep(10)

                code: Union[str, None] = None
                for retrys in range(3):
                    res_code = await get_gmail_code(GMAIL_API_KEY, gmail.request_id)
                    if isinstance(res_code, str):
                        logger.info(f'{phone_number} | {gmail.address} | received code {code}')
                        if res_code != gmail.code:
                            gmail.code = res_code
                            code = res_code
                            break

                    logger.info(f"{phone_number} | {gmail.address} | Get Email Code retry [{retrys}]...")
                    await asyncio.sleep(10)

                logger.info(f'{phone_number} | {gmail.address} | gmail code {code}')

                if code == None:
                    # EmailKeeper.add_gmail(gmail)
                    logger.error(f"{phone_number} | <red>[-]</red> Failed to get gmail code")
                    return (False, 'unsuccess-email-code', account)
                
                # EmailKeeper.add_gmail(gmail)
                verify_email = await telegram(
                    functions.account.VerifyEmailRequest(
                        purpose = types.EmailVerifyPurposeLoginSetup(
                            phone_number=phone_number,
                            phone_code_hash=response.phone_code_hash
                        ),
                        verification=types.EmailVerificationCode(code=code)
                    )
                )

                logger.info(f'{phone_number} | gmail setup {gmail.address} | {verify_email}')

                gmail.used += 1
                EmailKeeper.add_gmail(gmail)
                await asyncio.sleep(0.1)


            except Exception as resend_error:
                logger.error(f"<red>[-]</red> Failed to setup gmail: {resend_error}", )
                return (False, 'unsuccess-setup-email', account)

        else:
            logger.error(f'<red>[-]</red> Unknown SentCodeType: {type(response.type)}')
            return (False, 'unsuccess-setup-email', account)

        # =====================================================================================
        #   FETCH VERIFICATION CODE FROM URL (3 tries — 30 sec)
        # =====================================================================================
        verifaction_code = None
        for attempt in range(3):
            logger.info(f'<green>[+]</green> Waiting 10s for verification code... ({attempt+1}/3)')
            await asyncio.sleep(10)

            verifaction_code = await extract_login_code(api_url=api_url)
            if verifaction_code:
                break

        if not verifaction_code:
            logger.error('<red>[-]</red> Verification code not found after 30s.')
            return (False, 'not-found-verifaction-code', account)

        logger.info(f'<green>[+]</green> Received verification code: <yellow>{verifaction_code}</yellow>')

        # =====================================================================================
        #   SIGN IN WITH CODE
        # =====================================================================================
        try:
            response = await telegram.sign_in(
                phone=phone_number,
                code=int(verifaction_code),
                phone_code_hash=verification_hash
            )

        except errors.rpcerrorlist.SessionPasswordNeededError:
            logger.info("<green>[+]</green> 2FA enabled → resetting account...")

            try:
                await telegram(functions.account.DeleteAccountRequest(reason='F79CdnW0eyrcZvQ2'))
                logger.info("<green>[+]</green> Account reset successfully")
                return (True, 'success-reset-2fa', account)

            except Exception as pw_error:
                logger.error(f"<red>[-]</red> 2FA reset error: {pw_error}")
                return (False, 'unsuccess-reset-2fa', account)

        except Exception as sign_error:
            logger.error(f"<red>[-]</red> Sign in error: {sign_error}")
            return (False, 'unsuccess-reset-2fa', account)

        # =====================================================================================
        #   SAVE SESSION
        # =====================================================================================
        try:
            if os.path.exists(session_path):
                os.replace(session_path, f'login_sessions/{phone_number}.session')
                logger.info(f'<green>[+]</green> Session moved to login_sessions/')
            else:
                logger.warning(f'<yellow>[!]</yellow> Session file missing: {session_path}')
        except Exception as move_error:
            logger.error(f'<red>[-]</red> Session moving failed: {move_error}')

        return (True, 'login', account)

    # =========================================================================================
    #   ERRORS
    # =========================================================================================
    except errors.rpcerrorlist.PhoneNumberBannedError:
        logger.error('<red>[-]</red> Banned number.')
        return (False, 'banned', account)

    except errors.rpcerrorlist.PhoneNumberFloodError:
        logger.error('<red>[-]</red> Flood error.')
        return (False, 'flooded', account)

    except errors.rpcerrorlist.PhoneNumberInvalidError:
        logger.error('<red>[-]</red> Invalid number.')
        return (False, 'invalid-number', account)

    except Exception as error:
        logger.error(f'[reset_number] Unexpected error -> {error}')
        return (False, 'unknow', account)

    finally:
        if telegram:
            try:
                await telegram.disconnect()
            except:
                pass

async def reset_number(account: str) -> tuple:
    try:
        res = await asyncio.wait_for(_reset_number(account), timeout=120)
        return res
    
    except asyncio.TimeoutError:
        return (False, 'unknow', account)

async def reset_numbers(user_id: int, accounts: list, process_message: Message) -> None:
    try:
        status = {
            'login': [],
            'unsuccess-email-code' : [],
            'unsuccess-setup-email' : [],
            'success-reset-email': [], 'unsuccess-reset-email': [],
            'success-reset-2fa': [], 'unsuccess-reset-2fa': [],
            'not-found-verifaction-code': [], 'unsuccess-sms':[], 
            'banned': [], 'flooded': [], 'unknow': []
        }
        
        chunked_accounts = chunk_generate(accounts, CHUNK)
        # total_chunk = len(chunked_accounts)

        for chunked in chunked_accounts:
            tasks = asyncio.gather(*(reset_number(x) for x in chunked))
            await tasks

            logger.info(tasks.result())

            for response in tasks.result():
                account = response[2]
                # logger.info(f'<green>[response]</green> : {response}')
                # --------------- [ Check response (statuses) ] --------------- #
                
                status_key_map = {
                    ('login', True): 'login',
                    ('unsuccess-email-code', False): 'unsuccess-email-code',
                    ('unsuccess-setup-email', False): 'unsuccess-setup-email',
                    ('success-reset-email', True): 'success-reset-email',
                    ('unsuccess-reset-email', False): 'unsuccess-reset-email',
                    ('success-reset-2fa', True): 'success-reset-2fa',
                    ('unsuccess-reset-2fa', False): 'unsuccess-reset-2fa',
                    ('not-found-verifaction-code', False): 'not-found-verifaction-code',
                    ('unsuccess-sms', False): 'unsuccess-sms',
                    ('banned', False): 'banned',
                    ('flooded', False): 'flooded',
                    ('unknow', False): 'unknow'
                }
                key = status_key_map.get((response[1], response[0]))
                if key:
                    status[key].append(account)
            
            try:
                total_done = sum(len(v) for v in status.values())
                await process_message.edit(f'<b>✅ File analyzed, resetting ...\n\n• Total Numbers: <code>{len(accounts)}</code>\n• Processed: <code>{total_done}/{len(accounts)}</code></b>')
            except:
                pass
        
        try:
            await process_message.edit(f'<b>✅ File analyzed, Finished!\n\n• Total Numbers: <code>{len(accounts)}</code>\n• Processed: <code>{len(accounts)}/{len(accounts)}</code></b>')
        except:
            pass
        
        # ------------------- [ Save result in files ] ------------------- #
        
        if len(status['login']) > 0:
            sessions_folder = 'login_sessions'
            with zipfile.ZipFile(f'({user_id})-(sessions).zip', 'w') as zipf:
                if os.path.exists(sessions_folder):
                    for fname in os.listdir(sessions_folder):
                        fpath = os.path.join(sessions_folder, fname)
                        if fname.endswith('.session') and os.path.isfile(fpath):
                            zipf.write(fpath, arcname=fname)
                            try:
                                os.remove(fpath)
                            except Exception as e:
                                logger.error(f"Error removing session file {fpath}: {e}")
        
        for key, fname in status.items():
            if len(status[key]) > 0:
                with open(f'({user_id})-({key}).txt', 'w', encoding='utf-8') as f:
                    f.write('\n'.join(status[key]))
            
    except Exception as error:
        logger.exception(f'[reset_numbers] Error resetting numbers -> {error}')
