此代码为走投无路 跑路必备代码
在linux中我们使用python这就不细讲了
比如你在任意设备打开CMD界面或者手机ssh程序
首先我们ssh到我们的服务器
随后我们执行代码文件
功能简单但是强大 能实时获取消息
选择你的联系人或者输入联系人的TG号都可以找到他
也有监听Telegram系统通知功能,.获取验证码,不怕手机或者卡的问题损失账号
代码如下,论坛好像没有嵌入功能,所以各位可以用AI重新调整一下缩进
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys, io, os, re, asyncio, logging, socket
from datetime import datetime
from contextlib import suppress
from unicodedata import normalize
from telethon import TelegramClient, events
from telethon.tl.types import User
from telethon.errors import RPCError, SessionPasswordNeededError
# ---------- 日志 ----------
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("tg-cli")
# ---------- 统一 I/O 编码,避免手机 SSH 乱码 ----------
try:
if hasattr(sys.stdin, "reconfigure"):
sys.stdin.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
except Exception:
sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8', errors='replace')
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
def uinput(prompt: str = "") -> str:
try:
return input(prompt)
except UnicodeDecodeError:
return sys.stdin.readline().rstrip("\n")
async def auinput(prompt: str = "") -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: uinput(prompt))
# ============== 配置(建议用环境变量) ==============
API_ID = int(os.getenv("TG_API_ID", "这里输入你的API"))
API_HASH = os.getenv("TG_API_HASH", "这里输入你的API")
SESSION_NAME = os.getenv("TG_SESSION", "session_name")
# 如需代理,按需启用(示例:本地 1080 socks5)
# from telethon.network.connection.tcpabridged import ConnectionTcpAbridged
# PROXY = ('socks5', '127.0.0.1', 1080)
# 之后在 TelegramClient(...) 里传 connection=ConnectionTcpAbridged, proxy=PROXY
# ---------- 小工具 ----------
def _norm(s: str) -> str:
return re.sub(r"\s+", "", normalize("NFKC", s or "")).lower()
def fmt_msg(m):
t = m.date.strftime("%H:%M:%S") if getattr(m, "date", None) else "--:--:--"
body = (m.message or "").replace("\r", "")
sender = "你" if getattr(m, "out", False) else (str(m.sender_id) if getattr(m, "sender_id", None) else "对方")
return f"[{t}] {sender}: {body}"
async def send_with_retry(coro_factory, retries=1):
for i in range(retries + 1):
try:
return await coro_factory()
except RPCError as e:
if i < retries:
await asyncio.sleep(0.8)
continue
raise e
async def ensure_login(client: TelegramClient):
log.info("正在连接 Telegram...")
try:
# 不再包 20s 的 wait_for,避免“刚连上又被 wait_for 截断”
await client.connect()
except (asyncio.TimeoutError, socket.gaierror, ConnectionError) as e:
log.error("连接 Telegram 失败:%s", e)
log.error("可能是网络被拦/无外网/需要代理。检查防火墙、时间、DNS或配置代理。")
raise
if not await client.is_user_authorized():
log.info("首次登录:需要手机号与验证码。")
phone = uinput("请输入手机号(含国家码,如 +855...): ").strip()
sent = await client.send_code_request(phone)
code = uinput("请输入验证码(不含空格): ").strip()
try:
await client.sign_in(phone, code)
except SessionPasswordNeededError:
pwd = uinput("已开启两步验证,请输入密码: ").strip()
await client.sign_in(password=pwd)
log.info("登录成功。")
else:
log.info("已授权。")
# ----------------- 只联系人(非机器人)功能封装 -----------------
class ContactsOnlyApp:
def __init__(self, client: TelegramClient):
self.client = client
self.contacts = {} # {'uid_str': {id,name,username}}
self._typing_task = None
self._typing_on = False
self._msg_handler = None
async def list_contacts(self):
self.contacts = {}
async for dialog in self.client.iter_dialogs():
e = dialog.entity
if isinstance(e, User) and not e.bot:
name = f"{e.first_name or ''} {e.last_name or ''}".strip() or "(无名)"
self.contacts[str(e.id)] = {'id': e.id, 'name': name, 'username': e.username or ''}
if self.contacts:
print("========= 联系人列表(仅真人) =========")
for idx, (uid, info) in enumerate(self.contacts.items(), 1):
uname = f"@{info['username']}" if info['username'] else ""
print(f"[{idx}] {info['name']} ({uname}) - id: {info['id']}")
else:
print("你目前没有联系人。")
async def list_unread_private(self):
print("======= 未读联系人消息 =======")
found = False
async for dialog in self.client.iter_dialogs():
e = dialog.entity
if isinstance(e, User) and not e.bot and dialog.unread_count > 0:
name = f"{e.first_name or ''} {e.last_name or ''}".strip() or "(无名)"
print(f"{name}(未读: {dialog.unread_count})")
found = True
if not found:
print("没有任何未读的联系人消息。")
def resolve_contact(self, user_input: str):
s = (user_input or "").strip()
if not s:
return None
# 序号
if s.isdigit():
idx = int(s) - 1
if 0 <= idx < len(self.contacts):
return list(self.contacts.keys())[idx]
# 精确 id
if s.isdigit() and s in self.contacts:
return s
# @username
if s.startswith("@"):
s2 = s[1:].lower()
for uid, info in self.contacts.items():
if (info.get("username") or "").lower() == s2:
return uid
# 模糊姓名
ns = _norm(s)
cand = [uid for uid, info in self.contacts.items() if ns in _norm(info.get("name", ""))]
return cand[0] if len(cand) == 1 else None
async def chat_with_contact(self):
await self.list_contacts()
user_input = uinput("请输入编号 / id / @用户名 / 模糊姓名: ").strip()
user_id = self.resolve_contact(user_input) or user_input
contact = self.contacts.get(str(user_id))
if not contact:
print("未找到该联系人(或匹配多个),请更具体一点。")
return
entity = await self.client.get_entity(int(user_id))
if not isinstance(entity, User) or getattr(entity, "bot", False):
print("该对象不是联系人(或是机器人),已拦截。")
return
print(f"你正在和 {contact['name']} (@{contact['username']}) 聊天")
await self.chat_loop(entity)
async def chat_with_username(self):
username = uinput("请输入用户名(不带@):").strip()
if not username:
print("用户名不能为空。")
return
try:
entity = await self.client.get_entity(username)
if not isinstance(entity, User) or getattr(entity, "bot", False):
print("该对象不是联系人(或是机器人),已拦截。")
return
name = f"{getattr(entity, 'first_name', '') or ''} {getattr(entity, 'last_name', '') or ''}".strip() or "(无名)"
print(f"你正在和 {name} (@{username}) 聊天")
await self.chat_loop(entity)
except Exception as e:
print(f"无法获取该用户名:{e}")
async def chat_loop(self, entity):
msgs = await self.client.get_messages(entity, limit=10)
print("======= 最近 10 条消息(联系人) =======")
for m in reversed(msgs):
if m.message:
print(fmt_msg(m))
@self.client.on(events.NewMessage(chats=entity))
async def _on_new_message(event):
if not event.out:
print("\n" + fmt_msg(event.message))
print("你: ", end="", flush=True)
self._msg_handler = _on_new_message
self._typing_on = False
self._typing_task = None
async def typing_loop():
from telethon.tl.functions.messages import SetTypingRequest
from telethon.tl.types import SendMessageTypingAction
while self._typing_on:
with suppress(Exception):
await self.client(SetTypingRequest(peer=entity, action=SendMessageTypingAction()))
await asyncio.sleep(4)
print("输入 `/help` 查看可用命令。")
try:
while True:
text = await auinput("你: ")
if not text:
continue
s = text.strip()
if not s:
continue
if s.startswith("/"):
cmd, *rest = s[1:].split()
arg = " ".join(rest)
if cmd in ("exit", "quit", "q"):
print("退出聊天。")
break
elif cmd in ("n", "refresh"):
msgs = await self.client.get_messages(entity, limit=10)
print("======= 刷新最新 10 条 =======")
for m in reversed(msgs):
if m.message:
print(fmt_msg(m))
elif cmd == "last":
n = int(arg or "50")
n = max(1, min(n, 200))
msgs = await self.client.get_messages(entity, limit=n)
print(f"======= 最近 {n} 条消息 =======")
for m in reversed(msgs):
if m.message:
print(fmt_msg(m))
elif cmd == "del":
n = int(arg or "100")
n = max(1, min(n, 1000))
msgs = await self.client.get_messages(entity, limit=n)
ids = [m.id for m in msgs]
if ids:
await send_with_retry(lambda: self.client.delete_messages(entity, ids))
print(f"已删除最近 {len(ids)} 条消息(单向撤回)。")
else:
print("没有消息可删除。")
elif cmd == "delchat":
from telethon.tl.functions.messages import DeleteHistoryRequest
confirm = uinput("⚠ 确定要删除双方聊天记录?(y/n): ").strip().lower()
if confirm == "y":
try:
await self.client(DeleteHistoryRequest(peer=entity, max_id=2**31-1, revoke=True))
print("✅ 已删除双方聊天记录。")
break
except Exception as e:
print(f"❌ 删除失败: {e}")
else:
print("取消删除。")
elif cmd == "file":
path = arg.strip().strip("\"'")
if not path:
print("用法:/file /path/to/file")
continue
if not os.path.exists(path):
print("文件不存在。")
continue
await send_with_retry(lambda: self.client.send_file(entity, path))
print("已发送文件。")
elif cmd == "typing":
v = (arg or "").lower()
if v in ("on", "1", "true"):
if not self._typing_on:
self._typing_on = True
self._typing_task = asyncio.create_task(typing_loop())
print("已开启正在输入提示。")
elif v in ("off", "0", "false"):
self._typing_on = False
if self._typing_task:
self._typing_task.cancel()
self._typing_task = None
print("已关闭正在输入提示。")
else:
print("用法:/typing on|off")
elif cmd == "read":
await self.client.send_read_acknowledge(entity)
print("已标记为已读。")
elif cmd == "help":
print("""可用命令(仅联系人):
/n, /refresh 刷新最近 10 条
/last N 显示最近 N 条(默认 50,最大 200)
/del N 删除最近 N 条(单向撤回,默认 100,最大 1000)
/delchat 删除双方聊天记录(撤回对方端)
/file /path/to/file 发送文件/图片/视频
/typing on|off 打开/关闭“正在输入”提示
/read 将未读标记为已读
/exit 退出聊天
""")
else:
print("未知命令,输入 /help 查看帮助。")
continue
# 普通文本消息
await send_with_retry(lambda: self.client.send_message(entity, s))
class _Echo: pass
x = _Echo(); x.out=True; x.sender_id=None; x.date=datetime.now(); x.message=s
print(fmt_msg(x))
except KeyboardInterrupt:
print("\n已中断,退出聊天。")
finally:
with suppress(Exception):
if self._typing_task:
self._typing_task.cancel()
with suppress(Exception):
if self._msg_handler:
self.client.remove_event_handler(self._msg_handler)
# ----------------- 主程序 -----------------
async def main():
# 在这里创建 client,绑定当前事件循环;退出时自动断开
# 如需代理,把上面注释的 PROXY/Connection 打开并传入这里
async with TelegramClient(SESSION_NAME, API_ID, API_HASH) as client:
await ensure_login(client)
app = ContactsOnlyApp(client)
while True:
print("""
============== Telegram CLI(仅联系人) ==============
1. 列出联系人(仅真人)
2. 与联系人聊天
3. 退出
4. 查看未读联系人消息
5. 通过用户名聊天(仅联系人)
""")
choice = uinput("请输入功能编号: ").strip()
if choice == "1":
await app.list_contacts()
elif choice == "2":
await app.chat_with_contact()
elif choice == "3":
print("再见!")
break
elif choice == "4":
await app.list_unread_private()
elif choice == "5":
await app.chat_with_username()
else:
print("无效输入。")
if __name__ == "__main__":
# 用 -u 或 PYTHONUNBUFFERED=1 运行可实时刷日志:python3 -u app.py
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nBye.")



