import tkinter as tk
from tkinter import ttk, messagebox
import socket
import threading
from flask import Flask, Response, request
import mss
import mss.tools
import os
import sys
import time
import subprocess
from PIL import Image
import io
import queue
import json
import ctypes
import numpy as np
import cv2 # 添加OpenCV用于高效圖像處理
class ScreenMirrorApp:
def __init__(self, root):
self.root = root
self.root.title("低延遲屏幕投屏工具")
self.root.geometry("550x550")
self.root.resizable(False, False)
# 設置應用圖標
self.set_icon()
# 獲取本機IP地址
self.ip_address = self.get_ip_address()
# 創建UI
self.create_widgets()
# 獲取顯示器信息
self.monitors = self.get_monitors()
# 初始化投屏狀態
self.mirroring = False
self.server_thread = None
self.flask_app = None
self.last_frame = None
self.frame_queue = queue.Queue(maxsize=1)
self.capture_thread = None
self.stop_capture = threading.Event()
# 自動添加防火墻規則
self.add_firewall_rule()
# 性能參數默認值
self.quality = 70
self.frame_rate = 30
self.resolution = 0.7
# 性能計數器
self.frame_count = 0
self.start_time = time.time()
# 客戶端連接狀態
self.active_clients = set()
# 黑屏圖像
self.black_image = self.create_black_image()
# 加載上次選擇的設置
self.load_config()
def create_black_image(self):
"""創建黑屏圖像"""
img = Image.new('RGB', (100, 100), (0, 0, 0))
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='JPEG', quality=1)
return img_byte_arr.getvalue()
def set_icon(self):
"""設置應用圖標"""
try:
if getattr(sys, 'frozen', False):
icon_path = os.path.join(sys._MEIPASS, 'icon.ico')
else:
icon_path = 'icon.ico'
self.root.iconbitmap(icon_path)
except:
pass
def get_ip_address(self):
"""獲取本機IP地址"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
s.close()
return ip_address
except:
return "127.0.0.1"
def get_monitors(self):
"""獲取所有顯示器信息"""
with mss.mss() as sct:
return sct.monitors
def load_config(self):
"""加載上次的設置"""
config_path = os.path.join(os.path.expanduser("~"), ".screen_mirror_config.json")
try:
if os.path.exists(config_path):
with open(config_path, "r") as f:
config = json.load(f)
# 加載屏幕設置
last_screen = config.get("last_screen", 1)
if 1 <= last_screen <= len(self.monitors):
self.screen_var.set(last_screen)
# 加載性能設置
quality = config.get("quality", 70)
if 30 <= quality <= 95:
self.quality_var.set(quality)
frame_rate = config.get("frame_rate", 30)
if 5 <= frame_rate <= 60:
self.fps_var.set(frame_rate)
resolution = config.get("resolution", 0.7)
if 0.3 <= resolution <= 1.0:
self.res_var.set(resolution)
self.status_var.set("已加載上次的設置")
except Exception as e:
self.status_var.set(f"加載配置失敗: {str(e)}")
def save_config(self):
"""保存當前設置到配置文件"""
config_path = os.path.join(os.path.expanduser("~"), ".screen_mirror_config.json")
try:
config = {
"last_screen": self.screen_var.get(),
"quality": self.quality_var.get(),
"frame_rate": self.fps_var.get(),
"resolution": self.res_var.get()
}
os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, "w") as f:
json.dump(config, f)
except Exception as e:
self.status_var.set(f"保存配置失敗: {str(e)}")
def create_widgets(self):
"""創建UI界面"""
# 標題
title_label = ttk.Label(self.root, text="低延遲屏幕投屏工具", font=("Arial", 16, "bold"))
title_label.pack(pady=10)
# 屏幕選擇
screen_frame = ttk.LabelFrame(self.root, text="選擇投屏屏幕")
screen_frame.pack(pady=5, padx=20, fill="x")
self.screen_var = tk.IntVar(value=1)
screens = ["主屏幕", "屏幕2", "屏幕3", "屏幕4"]
for i, screen in enumerate(screens):
rb = ttk.Radiobutton(
screen_frame,
text=screen,
variable=self.screen_var,
value=i+1,
command=self.save_config
)
rb.grid(row=0, column=i, padx=10, pady=5)
# 性能設置
perf_frame = ttk.LabelFrame(self.root, text="性能設置")
perf_frame.pack(pady=5, padx=20, fill="x")
# 圖像質量
quality_frame = ttk.Frame(perf_frame)
quality_frame.pack(fill="x", padx=10, pady=5)
ttk.Label(quality_frame, text="圖像質量:").pack(side="left")
self.quality_var = tk.IntVar(value=70)
quality_scale = ttk.Scale(
quality_frame,
from_=30,
to=95,
variable=self.quality_var,
orient="horizontal",
length=200,
command=lambda _: self.save_config()
)
quality_scale.pack(side="left", padx=10)
ttk.Label(quality_frame, textvariable=self.quality_var).pack(side="left")
# 幀率
fps_frame = ttk.Frame(perf_frame)
fps_frame.pack(fill="x", padx=10, pady=5)
ttk.Label(fps_frame, text="幀率:").pack(side="left")
self.fps_var = tk.IntVar(value=30)
fps_scale = ttk.Scale(
fps_frame,
from_=5,
to=60,
variable=self.fps_var,
orient="horizontal",
length=200,
command=lambda _: self.save_config()
)
fps_scale.pack(side="left", padx=10)
ttk.Label(fps_frame, textvariable=self.fps_var).pack(side="left")
# 分辨率
res_frame = ttk.Frame(perf_frame)
res_frame.pack(fill="x", padx=10, pady=5)
ttk.Label(res_frame, text="分辨率:").pack(side="left")
self.res_var = tk.DoubleVar(value=0.7)
res_scale = ttk.Scale(
res_frame,
from_=0.3,
to=1.0,
variable=self.res_var,
orient="horizontal",
length=200,
command=lambda _: self.save_config()
)
res_scale.pack(side="left", padx=10)
ttk.Label(res_frame, textvariable=self.res_var).pack(side="left")
# 控制按鈕
button_frame = ttk.Frame(self.root)
button_frame.pack(pady=10)
self.start_button = ttk.Button(
button_frame,
text="啟動投屏",
command=self.start_mirroring,
width=15
)
self.start_button.grid(row=0, column=0, padx=10)
self.stop_button = ttk.Button(
button_frame,
text="停止投屏",
command=self.stop_mirroring,
state="disabled",
width=15
)
self.stop_button.grid(row=0, column=1, padx=10)
# 鏈接顯示
link_frame = ttk.LabelFrame(self.root, text="投屏鏈接")
link_frame.pack(pady=5, padx=20, fill="x")
self.link_var = tk.StringVar(value="請先啟動投屏")
link_label = ttk.Label(
link_frame,
textvariable=self.link_var,
font=("Arial", 10),
anchor="center"
)
link_label.pack(pady=5, padx=10)
# 復制鏈接按鈕
copy_button = ttk.Button(
link_frame,
text="復制鏈接",
command=self.copy_link,
width=10
)
copy_button.pack(pady=5)
# 性能信息
perf_info_frame = ttk.LabelFrame(self.root, text="性能信息")
perf_info_frame.pack(pady=5, padx=20, fill="x")
self.fps_info_var = tk.StringVar(value="FPS: --")
self.latency_var = tk.StringVar(value="延遲: --")
self.size_var = tk.StringVar(value="幀大小: --")
ttk.Label(perf_info_frame, textvariable=self.fps_info_var).pack(anchor="w", padx=10, pady=2)
ttk.Label(perf_info_frame, textvariable=self.latency_var).pack(anchor="w", padx=10, pady=2)
ttk.Label(perf_info_frame, textvariable=self.size_var).pack(anchor="w", padx=10, pady=2)
# 聯系信息
contact_frame = ttk.Frame(self.root)
contact_frame.pack(pady=5, padx=20, fill="x")
contact_label = ttk.Label(
contact_frame,
text="來自吾愛破解原創 52pojie.cn 如有問題請聯系開發者VX: SoullesFox",
font=("Arial", 10, "bold"),
foreground="red",
anchor="center"
)
contact_label.pack(fill="x")
# 狀態欄
self.status_var = tk.StringVar(value="就緒")
status_bar = ttk.Label(
self.root,
textvariable=self.status_var,
relief="sunken",
anchor="w"
)
status_bar.pack(side="bottom", fill="x")
def add_firewall_rule(self):
"""添加防火墻規則允許端口5000"""
try:
command = f'netsh advfirewall firewall show rule name="ScreenMirrorPort5000"'
result = subprocess.run(command, capture_output=True, text=True, shell=True)
if "沒有規則" in result.stdout:
command = (
'netsh advfirewall firewall add rule name="ScreenMirrorPort5000" '
'dir=in action=allow protocol=TCP localport=5000'
)
subprocess.run(command, shell=True)
self.status_var.set("已添加防火墻規則")
except Exception as e:
self.status_var.set(f"防火墻設置失敗: {str(e)}")
def copy_link(self):
"""復制鏈接到剪貼板"""
if self.link_var.get() != "請先啟動投屏":
self.root.clipboard_clear()
self.root.clipboard_append(self.link_var.get())
self.status_var.set("鏈接已復制到剪貼板")
def start_mirroring(self):
"""啟動投屏服務"""
if self.mirroring:
return
self.mirroring = True
self.start_button.config(state="disabled")
self.stop_button.config(state="enabled")
self.status_var.set("loading...")
# 獲取當前設置
self.quality = self.quality_var.get()
self.frame_rate = self.fps_var.get()
self.resolution = self.res_var.get()
# 重置性能計數器
self.frame_count = 0
self.start_time = time.time()
# 創建Flask應用
self.flask_app = Flask(__name__)
# 添加路由
@self.flask_app.route('/')
def index():
return """
<html>
<head>
<title></title>
<meta charset="UTF-8">
<style>
body {
text-align: center;
background-color: #000;
margin: 0;
padding: 0;
overflow: hidden;
}
h1 {
color: #333;
margin-top: 10px;
}
img {
max-width: 100%;
max-height: 90vh;
box-shadow: 0 0 10px rgba(0,0,0,0.3);
cursor: pointer;
}
.status {
margin: 10px;
padding: 8px;
background-color: #e0e0e0;
border-radius: 5px;
}
.stats {
margin: 5px;
padding: 5px;
background-color: #d0d0d0;
border-radius: 5px;
display: inline-block;
}
.controls {
position: fixed;
top: 10px;
right: 10px;
background: rgba(255, 255, 255, 0.7);
padding: 5px;
border-radius: 5px;
}
.fullscreen-btn {
background: #4CAF50;
color: white;
border: none;
padding: 5px 10px;
border-radius: 3px;
cursor: pointer;
}
.fullscreen-btn:hover {
background: #45a049;
}
</style>
<script>
let lastFrameTime = 0;
let frameTimes = [];
let fps = 0;
let latency = 0;
let isFullscreen = false;
let connectionId = Date.now(); // 唯一連接ID
function updateImage() {
const img = document.getElementById('screenImg');
const status = document.getElementById('status');
const fpsDisplay = document.getElementById('fps');
const latencyDisplay = document.getElementById('latency');
// 添加時間戳防止緩存
const startTime = performance.now();
img.src = '/screen?cid=' + connectionId + '&t=' + new Date().getTime();
img.onload = function() {
const endTime = performance.now();
latency = endTime - startTime;
// 計算FPS
const now = performance.now();
if (lastFrameTime > 0) {
const delta = now - lastFrameTime;
frameTimes.push(delta);
if (frameTimes.length > 10) frameTimes.shift();
const avgDelta = frameTimes.reduce((a, b) => a + b, 0) / frameTimes.length;
fps = Math.round(1000 / avgDelta);
}
lastFrameTime = now;
// 更新顯示
status.textContent = '投屏中...';
fpsDisplay.textContent = `FPS: ${fps}`;
latencyDisplay.textContent = `延遲: ${Math.round(latency)}ms`;
setTimeout(updateImage, 10);
};
img.onerror = function() {
status.textContent = '連接失敗,正在重試...';
setTimeout(updateImage, 1000);
};
}
function toggleFullscreen() {
const img = document.getElementById('screenImg');
if (!isFullscreen) {
if (img.requestFullscreen) {
img.requestFullscreen();
} else if (img.mozRequestFullScreen) {
img.mozRequestFullScreen();
} else if (img.webkitRequestFullscreen) {
img.webkitRequestFullscreen();
} else if (img.msRequestFullscreen) {
img.msRequestFullscreen();
}
isFullscreen = true;
document.getElementById('fullscreenBtn').textContent = '退出全屏';
} else {
if (document.exitFullscreen) {
document.exitFullscreen();
} else if (document.mozCancelFullScreen) {
document.mozCancelFullScreen();
} else if (document.webkitExitFullscreen) {
document.webkitExitFullscreen();
} else if (document.msExitFullscreen) {
document.msExitFullscreen();
}
isFullscreen = false;
document.getElementById('fullscreenBtn').textContent = '全屏顯示';
}
}
// 監聽全屏狀態變化
document.addEventListener('fullscreenchange', handleFullscreenChange);
document.addEventListener('webkitfullscreenchange', handleFullscreenChange);
document.addEventListener('mozfullscreenchange', handleFullscreenChange);
document.addEventListener('MSFullscreenChange', handleFullscreenChange);
function handleFullscreenChange() {
isFullscreen = !!(document.fullscreenElement ||
document.webkitFullscreenElement ||
document.mozFullScreenElement ||
document.msFullscreenElement);
document.getElementById('fullscreenBtn').textContent = isFullscreen ? '退出全屏' : '全屏顯示';
}
// 雙擊圖像切換全屏
function setupDoubleClick() {
const img = document.getElementById('screenImg');
img.addEventListener('dblclick', toggleFullscreen);
}
window.onload = function() {
updateImage();
setupDoubleClick();
};
</script>
</head>
<body>
<div class="controls">
<button id="fullscreenBtn" class="fullscreen-btn">全屏顯示</button>
</div>
<div id="status" class="status">正在連接...</div>
<div class="stats">
<span id="fps">FPS: --</span> |
<span id="latency">延遲: --</span>
</div>
<img id="screenImg" />
</body>
</html>
"""
@self.flask_app.route('/screen')
def screen():
# 獲取客戶端唯一ID
client_id = request.args.get('cid', '')
# 生成實時幀
return Response(
self.generate_frames(client_id),
mimetype='multipart/x-mixed-replace; boundary=frame'
)
# 啟動屏幕捕獲線程
self.start_capture_thread()
# 在后臺線程中運行Flask服務器
self.server_thread = threading.Thread(
target=self.run_server,
daemon=True
)
self.server_thread.start()
# 更新鏈接顯示
port = 5000
self.link_var.set(f"http://{self.ip_address}:{port}")
self.status_var.set(f"投屏已啟動 - 正在監聽端口 {port}")
# 啟動性能監控
self.root.after(1000, self.update_performance)
def start_capture_thread(self):
"""啟動屏幕捕獲線程"""
if self.capture_thread and self.capture_thread.is_alive():
self.stop_capture.set()
self.capture_thread.join(timeout=1.0)
# 清空隊列
while not self.frame_queue.empty():
try:
self.frame_queue.get_nowait()
except queue.Empty:
break
# 重置停止標志
self.stop_capture.clear()
# 創建新線程
self.capture_thread = threading.Thread(
target=self.capture_screen,
args=(self.screen_var.get(), self.quality, self.frame_rate, self.resolution),
daemon=True
)
self.capture_thread.start()
def capture_screen(self, monitor_idx, quality, frame_rate, resolution):
"""屏幕捕獲線程 - 使用OpenCV優化版本"""
try:
# 獲取顯示器信息
monitor = self.monitors[monitor_idx]
# 計算目標尺寸
width = int(monitor['width'] * resolution)
height = int(monitor['height'] * resolution)
# 計算幀間隔
frame_interval = 1.0 / frame_rate
# 初始化性能計數器
last_time = time.time()
frame_counter = 0
# 使用OpenCV進行高效捕獲
with mss.mss() as sct:
while not self.stop_capture.is_set():
start_time = time.time()
try:
# 捕獲屏幕
screenshot = sct.grab(monitor)
# 轉換為numpy數組
img = np.array(screenshot)
# 調整分辨率
if resolution < 1.0:
# 使用OpenCV進行快速縮放
img = cv2.resize(img, (width, height), interpolation=cv2.INTER_AREA)
# 轉換為JPEG - 使用OpenCV高效編碼
_, jpeg_data = cv2.imencode('.jpg', img, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
# 放入隊列(如果隊列已滿則替換)
try:
self.frame_queue.put(jpeg_data.tobytes(), block=False)
except queue.Full:
try:
self.frame_queue.get_nowait()
except queue.Empty:
pass
self.frame_queue.put(jpeg_data.tobytes(), block=False)
# 控制幀率 - 使用更精確的方法
elapsed = time.time() - start_time
sleep_time = max(0, frame_interval - elapsed)
# 如果處理時間超過幀間隔,跳過下一幀
if sleep_time > 0:
time.sleep(sleep_time)
# 性能計數
frame_counter += 1
current_time = time.time()
if current_time - last_time >= 1.0:
actual_fps = frame_counter / (current_time - last_time)
self.status_var.set(f"捕獲中: {actual_fps:.1f} FPS")
last_time = current_time
frame_counter = 0
except Exception as e:
self.status_var.set(f"捕獲錯誤: {str(e)}")
time.sleep(0.1)
except Exception as e:
self.status_var.set(f"捕獲初始化錯誤: {str(e)}")
def update_performance(self):
"""更新性能信息"""
if self.mirroring:
# 計算FPS
elapsed = time.time() - self.start_time
fps = self.frame_count / elapsed if elapsed > 0 else 0
self.fps_info_var.set(f"FPS: {fps:.1f}")
# 重置計數器
self.frame_count = 0
self.start_time = time.time()
# 定期檢查
self.root.after(1000, self.update_performance)
def stop_mirroring(self):
"""停止投屏服務"""
if not self.mirroring:
return
self.mirroring = False
self.start_button.config(state="enabled")
self.stop_button.config(state="disabled")
self.status_var.set("投屏已停止")
# 停止捕獲線程
self.stop_capture.set()
if self.capture_thread and self.capture_thread.is_alive():
self.capture_thread.join(timeout=1.0)
def generate_frames(self, client_id):
"""生成屏幕幀(JPEG格式)"""
while self.mirroring:
try:
# 從隊列獲取幀數據
jpeg_data = self.frame_queue.get(timeout=0.1) # 縮短超時時間
# 保存最后一幀
self.last_frame = jpeg_data
# 更新幀計數
self.frame_count += 1
# 更新幀大小信息
self.size_var.set(f"幀大小: {len(jpeg_data)/1024:.1f} KB")
# 生成幀
yield (
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpeg_data + b'\r\n'
)
except queue.Empty:
# 隊列為空,等待下一幀
time.sleep(0.01)
except Exception as e:
self.status_var.set(f"傳輸錯誤: {str(e)}")
time.sleep(0.1)
# 投屏停止后發送黑屏圖像
while True:
yield (
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + self.black_image + b'\r\n'
)
time.sleep(0.1)
def run_server(self):
"""運行Flask服務器"""
try:
self.flask_app.run(
host='0.0.0.0',
port=5000,
threaded=True,
use_reloader=False
)
except Exception as e:
self.status_var.set(f"服務器錯誤: {str(e)}")
self.mirroring = False
self.start_button.config(state="enabled")
self.stop_button.config(state="disabled")
def is_admin():
"""檢查是否以管理員權限運行"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
if __name__ == "__main__":
# 檢查是否以管理員權限運行
if os.name == 'nt' and not is_admin():
# 重新以管理員權限運行
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(sys.argv), None, 1)
sys.exit(0)
root = tk.Tk()
app = ScreenMirrorApp(root)
root.protocol("WM_DELETE_WINDOW", lambda: (app.stop_mirroring(), root.destroy()))
root.mainloop()