分享一个云端Tlemegram命令行使用方法

此代码为走投无路 跑路必备代码
在linux中我们使用python这就不细讲了
比如你在任意设备打开CMD界面或者手机ssh程序

首先我们ssh到我们的服务器

随后我们执行代码文件


功能简单但是强大 能实时获取消息

选择你的联系人或者输入联系人的TG号都可以找到他

也有监听Telegram系统通知功能,.获取验证码,不怕手机或者卡的问题损失账号

代码如下,论坛好像没有嵌入功能,所以各位可以用AI重新调整一下缩进

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import sys, io, os, re, asyncio, logging, socket
from datetime import datetime
from contextlib import suppress
from unicodedata import normalize


from telethon import TelegramClient, events
from telethon.tl.types import User
from telethon.errors import RPCError, SessionPasswordNeededError


# ---------- 日志 ----------
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("tg-cli")


# ---------- 统一 I/O 编码,避免手机 SSH 乱码 ----------
try:
   if hasattr(sys.stdin, "reconfigure"):
       sys.stdin.reconfigure(encoding='utf-8', errors='replace')
   if hasattr(sys.stdout, "reconfigure"):
       sys.stdout.reconfigure(encoding='utf-8', errors='replace')
except Exception:
   sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8', errors='replace')
   sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')


def uinput(prompt: str = "") -> str:
   try:
       return input(prompt)
   except UnicodeDecodeError:
       return sys.stdin.readline().rstrip("\n")


async def auinput(prompt: str = "") -> str:
   loop = asyncio.get_event_loop()
   return await loop.run_in_executor(None, lambda: uinput(prompt))


# ============== 配置(建议用环境变量) ==============
API_ID  = int(os.getenv("TG_API_ID", "这里输入你的API"))
API_HASH = os.getenv("TG_API_HASH", "这里输入你的API")
SESSION_NAME = os.getenv("TG_SESSION", "session_name")


# 如需代理,按需启用(示例:本地 1080 socks5)
# from telethon.network.connection.tcpabridged import ConnectionTcpAbridged
# PROXY = ('socks5', '127.0.0.1', 1080)
# 之后在 TelegramClient(...) 里传 connection=ConnectionTcpAbridged, proxy=PROXY


# ---------- 小工具 ----------
def _norm(s: str) -> str:
   return re.sub(r"\s+", "", normalize("NFKC", s or "")).lower()


def fmt_msg(m):
   t = m.date.strftime("%H:%M:%S") if getattr(m, "date", None) else "--:--:--"
   body = (m.message or "").replace("\r", "")
   sender = "你" if getattr(m, "out", False) else (str(m.sender_id) if getattr(m, "sender_id", None) else "对方")
   return f"[{t}] {sender}: {body}"


async def send_with_retry(coro_factory, retries=1):
   for i in range(retries + 1):
       try:
           return await coro_factory()
       except RPCError as e:
           if i < retries:
               await asyncio.sleep(0.8)
               continue
           raise e


async def ensure_login(client: TelegramClient):
   log.info("正在连接 Telegram...")
   try:
       # 不再包 20s 的 wait_for,避免“刚连上又被 wait_for 截断”
       await client.connect()
   except (asyncio.TimeoutError, socket.gaierror, ConnectionError) as e:
       log.error("连接 Telegram 失败:%s", e)
       log.error("可能是网络被拦/无外网/需要代理。检查防火墙、时间、DNS或配置代理。")
       raise


   if not await client.is_user_authorized():
       log.info("首次登录:需要手机号与验证码。")
       phone = uinput("请输入手机号(含国家码,如 +855...): ").strip()
       sent = await client.send_code_request(phone)
       code = uinput("请输入验证码(不含空格): ").strip()
       try:
           await client.sign_in(phone, code)
       except SessionPasswordNeededError:
           pwd = uinput("已开启两步验证,请输入密码: ").strip()
           await client.sign_in(password=pwd)
       log.info("登录成功。")
   else:
       log.info("已授权。")


# ----------------- 只联系人(非机器人)功能封装 -----------------
class ContactsOnlyApp:
   def __init__(self, client: TelegramClient):
       self.client = client
       self.contacts = {}  # {'uid_str': {id,name,username}}
       self._typing_task = None
       self._typing_on = False
       self._msg_handler = None


   async def list_contacts(self):
       self.contacts = {}
       async for dialog in self.client.iter_dialogs():
           e = dialog.entity
           if isinstance(e, User) and not e.bot:
               name = f"{e.first_name or ''} {e.last_name or ''}".strip() or "(无名)"
               self.contacts[str(e.id)] = {'id': e.id, 'name': name, 'username': e.username or ''}
       if self.contacts:
           print("========= 联系人列表(仅真人) =========")
           for idx, (uid, info) in enumerate(self.contacts.items(), 1):
               uname = f"@{info['username']}" if info['username'] else ""
               print(f"[{idx}] {info['name']} ({uname}) - id: {info['id']}")
       else:
           print("你目前没有联系人。")


   async def list_unread_private(self):
       print("======= 未读联系人消息 =======")
       found = False
       async for dialog in self.client.iter_dialogs():
           e = dialog.entity
           if isinstance(e, User) and not e.bot and dialog.unread_count > 0:
               name = f"{e.first_name or ''} {e.last_name or ''}".strip() or "(无名)"
               print(f"{name}(未读: {dialog.unread_count})")
               found = True
       if not found:
           print("没有任何未读的联系人消息。")


   def resolve_contact(self, user_input: str):
       s = (user_input or "").strip()
       if not s:
           return None
       # 序号
       if s.isdigit():
           idx = int(s) - 1
           if 0 <= idx < len(self.contacts):
               return list(self.contacts.keys())[idx]
       # 精确 id
       if s.isdigit() and s in self.contacts:
           return s
       # @username
       if s.startswith("@"):
           s2 = s[1:].lower()
           for uid, info in self.contacts.items():
               if (info.get("username") or "").lower() == s2:
                   return uid
       # 模糊姓名
       ns = _norm(s)
       cand = [uid for uid, info in self.contacts.items() if ns in _norm(info.get("name", ""))]
       return cand[0] if len(cand) == 1 else None


   async def chat_with_contact(self):
       await self.list_contacts()
       user_input = uinput("请输入编号 / id / @用户名 / 模糊姓名: ").strip()
       user_id = self.resolve_contact(user_input) or user_input
       contact = self.contacts.get(str(user_id))
       if not contact:
           print("未找到该联系人(或匹配多个),请更具体一点。")
           return


       entity = await self.client.get_entity(int(user_id))
       if not isinstance(entity, User) or getattr(entity, "bot", False):
           print("该对象不是联系人(或是机器人),已拦截。")
           return


       print(f"你正在和 {contact['name']} (@{contact['username']}) 聊天")
       await self.chat_loop(entity)


   async def chat_with_username(self):
       username = uinput("请输入用户名(不带@):").strip()
       if not username:
           print("用户名不能为空。")
           return
       try:
           entity = await self.client.get_entity(username)
           if not isinstance(entity, User) or getattr(entity, "bot", False):
               print("该对象不是联系人(或是机器人),已拦截。")
               return
           name = f"{getattr(entity, 'first_name', '') or ''} {getattr(entity, 'last_name', '') or ''}".strip() or "(无名)"
           print(f"你正在和 {name} (@{username}) 聊天")
           await self.chat_loop(entity)
       except Exception as e:
           print(f"无法获取该用户名:{e}")


   async def chat_loop(self, entity):
       msgs = await self.client.get_messages(entity, limit=10)
       print("======= 最近 10 条消息(联系人) =======")
       for m in reversed(msgs):
           if m.message:
               print(fmt_msg(m))


       @self.client.on(events.NewMessage(chats=entity))
       async def _on_new_message(event):
           if not event.out:
               print("\n" + fmt_msg(event.message))
               print("你: ", end="", flush=True)


       self._msg_handler = _on_new_message
       self._typing_on = False
       self._typing_task = None


       async def typing_loop():
           from telethon.tl.functions.messages import SetTypingRequest
           from telethon.tl.types import SendMessageTypingAction
           while self._typing_on:
               with suppress(Exception):
                   await self.client(SetTypingRequest(peer=entity, action=SendMessageTypingAction()))
               await asyncio.sleep(4)


       print("输入 `/help` 查看可用命令。")
       try:
           while True:
               text = await auinput("你: ")
               if not text:
                   continue
               s = text.strip()
               if not s:
                   continue


               if s.startswith("/"):
                   cmd, *rest = s[1:].split()
                   arg = " ".join(rest)


                   if cmd in ("exit", "quit", "q"):
                       print("退出聊天。")
                       break


                   elif cmd in ("n", "refresh"):
                       msgs = await self.client.get_messages(entity, limit=10)
                       print("======= 刷新最新 10 条 =======")
                       for m in reversed(msgs):
                           if m.message:
                               print(fmt_msg(m))


                   elif cmd == "last":
                       n = int(arg or "50")
                       n = max(1, min(n, 200))
                       msgs = await self.client.get_messages(entity, limit=n)
                       print(f"======= 最近 {n} 条消息 =======")
                       for m in reversed(msgs):
                           if m.message:
                               print(fmt_msg(m))


                   elif cmd == "del":
                       n = int(arg or "100")
                       n = max(1, min(n, 1000))
                       msgs = await self.client.get_messages(entity, limit=n)
                       ids = [m.id for m in msgs]
                       if ids:
                           await send_with_retry(lambda: self.client.delete_messages(entity, ids))
                           print(f"已删除最近 {len(ids)} 条消息(单向撤回)。")
                       else:
                           print("没有消息可删除。")


                   elif cmd == "delchat":
                       from telethon.tl.functions.messages import DeleteHistoryRequest
                       confirm = uinput("⚠ 确定要删除双方聊天记录?(y/n): ").strip().lower()
                       if confirm == "y":
                           try:
                               await self.client(DeleteHistoryRequest(peer=entity, max_id=2**31-1, revoke=True))
                               print("✅ 已删除双方聊天记录。")
                               break
                           except Exception as e:
                               print(f"❌ 删除失败: {e}")
                       else:
                           print("取消删除。")


                   elif cmd == "file":
                       path = arg.strip().strip("\"'")
                       if not path:
                           print("用法:/file /path/to/file")
                           continue
                       if not os.path.exists(path):
                           print("文件不存在。")
                           continue
                       await send_with_retry(lambda: self.client.send_file(entity, path))
                       print("已发送文件。")


                   elif cmd == "typing":
                       v = (arg or "").lower()
                       if v in ("on", "1", "true"):
                           if not self._typing_on:
                               self._typing_on = True
                               self._typing_task = asyncio.create_task(typing_loop())
                           print("已开启正在输入提示。")
                       elif v in ("off", "0", "false"):
                           self._typing_on = False
                           if self._typing_task:
                               self._typing_task.cancel()
                               self._typing_task = None
                           print("已关闭正在输入提示。")
                       else:
                           print("用法:/typing on|off")


                   elif cmd == "read":
                       await self.client.send_read_acknowledge(entity)
                       print("已标记为已读。")


                   elif cmd == "help":
                       print("""可用命令(仅联系人):
 /n, /refresh          刷新最近 10 条
 /last N               显示最近 N 条(默认 50,最大 200)
 /del N                删除最近 N 条(单向撤回,默认 100,最大 1000)
 /delchat              删除双方聊天记录(撤回对方端)
 /file /path/to/file   发送文件/图片/视频
 /typing on|off        打开/关闭“正在输入”提示
 /read                 将未读标记为已读
 /exit                 退出聊天
""")
                   else:
                       print("未知命令,输入 /help 查看帮助。")
                   continue


               # 普通文本消息
               await send_with_retry(lambda: self.client.send_message(entity, s))
               class _Echo: pass
               x = _Echo(); x.out=True; x.sender_id=None; x.date=datetime.now(); x.message=s
               print(fmt_msg(x))


       except KeyboardInterrupt:
           print("\n已中断,退出聊天。")
       finally:
           with suppress(Exception):
               if self._typing_task:
                   self._typing_task.cancel()
           with suppress(Exception):
               if self._msg_handler:
                   self.client.remove_event_handler(self._msg_handler)


# ----------------- 主程序 -----------------
async def main():
   # 在这里创建 client,绑定当前事件循环;退出时自动断开
   # 如需代理,把上面注释的 PROXY/Connection 打开并传入这里
   async with TelegramClient(SESSION_NAME, API_ID, API_HASH) as client:
       await ensure_login(client)
       app = ContactsOnlyApp(client)


       while True:
           print("""
============== Telegram CLI(仅联系人) ==============
1. 列出联系人(仅真人)
2. 与联系人聊天
3. 退出
4. 查看未读联系人消息
5. 通过用户名聊天(仅联系人)
""")
           choice = uinput("请输入功能编号: ").strip()
           if choice == "1":
               await app.list_contacts()
           elif choice == "2":
               await app.chat_with_contact()
           elif choice == "3":
               print("再见!")
               break
           elif choice == "4":
               await app.list_unread_private()
           elif choice == "5":
               await app.chat_with_username()
           else:
               print("无效输入。")


if __name__ == "__main__":
   # 用 -u 或 PYTHONUNBUFFERED=1 运行可实时刷日志:python3 -u app.py
   try:
       asyncio.run(main())
   except KeyboardInterrupt:
       print("\nBye.")

8 个赞

这回水平牛:cow_face:,满满一大页

牛逼,就是贴的代码断层了

大佬

这次可以啊,框框一顿水,这次认真写了满满一页

1 个赞

666

学到了

感谢大佬分享

666

这就是专业,哈哈

下面代码看不懂,

真大佬啊

羡慕技术

秃头老哥,又见面,哈哈

大佬请问这算不算容易被封号的人形bot

我用半年了 不会封号

1 个赞

好像有代码块功能的

abc
cbd
dbv

敲3个1前面的键,触发代码块

完全看不懂啊

牛的

好高端