编程问题能在这问吗?
本帖最后由 proto 于 2025-7-26 19:08 编辑我想实时接收程序A用udp传输数据包里的两三个数据,把它们通过websocket传输到另一个程序B中。
程序A只能udp输出,程序B只接收websocket协议传输
这个数据不需要每次收到都发送,大概每0.1秒甚至0.5秒发送最新1次接收到的那两三个数据即可
这种程序可以用python编写吗?能保证长期实时传输吗?
我用ai编程辅助编写的差不多,程序B可以收到信号了。但是有很严重的延迟,而且运行几秒后我的数据转接程序就卡住跳出了。
看到有用C#实现类似功能的,但是目前ai辅助编程做C#还是不完善,git上看到类似的C#程序资源包里往往不止一个文件,我看到就更不知道该用哪些了。
有哪些C#教程比较推荐?我总想着上来就把引用库什么的学到,可看很多C#教程从入门讲到基础进阶也没在标题上看到类似介绍引用的章节。我应该从头学吗?
你需要的可能是grpc 都封装好了写吧写吧就行了 能让程序B接收到信号的python文件是用Claude 4 sonnet改出来的,我用的镜像站还有Claude 3.7 sonnet thinking消耗算力更高。这个thinking是什么意思?在哪些方面更高效? Misono_Mayu 发表于 2025-7-26 19:03
换个方案,试试sse?也许性能比这个好,毕竟你这个只是单方发送不需要给A接口回调数据,或者你试试异步调用 ...
都是在一台计算机上的 又是一个会者不难,难者不会的问题。
这个东西实现起来很简单,就是把左手的东西倒腾到右手,核心逻辑都不到20行代码。
和你用什么语言没啥关系,用啥都一样,用python绝对可以写。(虽然我更建议你用c++)
但问题是,你想象得出来这个程序的核心逻辑是怎样的吗?
你接下来要干的是通过ai问清楚这个东西的工作原理,而不是用什么语言来实现。
当你弄懂用原理是什么之后,你可以很轻松地通过ai来完成这件事情。 本帖最后由 鸳鸳相抱 于 2025-7-27 07:33 编辑
python性能问题不大,我们平时写个脚本订阅mqtt一秒收几千帧转发udp都没啥压力
但是写脚本的经常会忽略各种异常处理导致脚本健壮性一上来就不太行 “半专业人士”是如何形成的,例如楼主懂得udp和WS,却看起来完全不会写代码,是运维出身吗?
—— 来自 Xiaomi 25019PNF3C, Android 15, 鹅球 v3.4.98 用qt写,很简单的
—— 来自 meizu MEIZU 21 Pro, Android 14, 鹅球 v3.5.99 收到转发很简单 问题是没收到或者是乱序怎么办
—— 来自 OnePlus LE2120, Android 14, 鹅球 v3.5.99 websocket不是tcp上的协议吗,跟udp啥关系,你想说的是不是webrtc? 用什么语言和 runtime 都没问题,调用操作系统的 api 都是一样的,肯定是你上层执行逻辑的问题 这不就是个中转服务么。。。C#完全可以做到,你需要考虑的是做好的服务放在什么容器里面 收和发用不同线程来写,在接收线程里加定时,收不到一直等容易卡死
程序B可以收到信号了。但是有很严重的延迟,而且运行几秒后我的数据转接程序就卡住跳出了。 你用ai写代码那就别来问了,ai写的什么玩意我测试过,要真能用码农早就全部去送外卖了
我理解数据源是程序A,udp 吐出来的,程序B 只能通过 ws 接,这两个都是已经存在的程序,不能修改。现在要写一个胶水程序把 A 的数据转发到 B?
那核心就是怎么读程序A的数据 怎么用ws转发到程序B ,以及给x秒内最新的数据
1. 程序A,这个数据包多大?会不会有沾包的问题?数据格式清楚么?这些背景信息得搞清楚了才能开发
2. 程序B的格式是什么样的?发送数据的话调包解决
3. 时间窗口的话,简单手搓就是每次来数据了就缓存一下,如果定时器两次之间有更新就转给程序B,同时更新这个数据为未修改过的状态。这里可能要加锁避免并发问题 你要AI写的话,需求得描述清楚。最好用 vscode 上的 roocode 插件,说明白需求后它直接就帮你一步一步写好了 代码没有,运行环境保密,玩蛋去吧…
你好,我需要一个python程序,它接收一组udp消息,解析udp里的特定数据,然后把解析后的重新组织的数据通过websocket协议发给另一个程序。解析函数占位即可,我后续再编写。
websocket是另一个服务端,我这里需要的程序是一个websocket客户端
明白了,你需要一个程序作为 UDP 服务器 接收数据,解析后作为 WebSocket 客户端 连接到另一个 WebSocket 服务端并发送数据。以下是修改后的代码:
import asyncio
import websockets
import socket
import json
from typing import Dict, Any
class UDPToWebSocketClient:
def __init__(self, udp_host: str = '0.0.0.0', udp_port: int = 9999,
ws_uri: str = 'ws://localhost:8765'):
self.udp_host = udp_host
self.udp_port = udp_port
self.ws_uri = ws_uri
self.udp_socket = None
self.websocket = None
self.running = False
async def parse_udp_message(self, data: bytes) -> Dict:
"""
解析UDP消息的占位函数
这里应该实现具体的解析逻辑,返回解析后的字典
"""
# TODO: 实现具体的解析逻辑
parsed_data = {
'raw_data': data.hex(),
'length': len(data),
# 可以添加更多解析后的字段
}
return parsed_data
async def handle_udp_messages(self):
"""处理UDP消息接收"""
loop = asyncio.get_running_loop()
# 创建UDP socket
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.bind((self.udp_host, self.udp_port))
self.udp_socket.setblocking(False)
print(f"Listening for UDP messages on {self.udp_host}:{self.udp_port}")
while self.running:
try:
data, addr = await loop.sock_recvfrom(self.udp_socket, 65536)
print(f"Received UDP message from {addr}: {data}")
# 解析UDP消息
parsed_data = await self.parse_udp_message(data)
# 发送给WebSocket服务器
if self.websocket and self.websocket.open:
await self.websocket.send(json.dumps(parsed_data))
else:
print("WebSocket connection not ready, message dropped")
except Exception as e:
print(f"Error handling UDP message: {e}")
await asyncio.sleep(0.1)# 避免CPU占用过高
async def connect_websocket(self):
"""连接到WebSocket服务器"""
while self.running:
try:
print(f"Connecting to WebSocket server at {self.ws_uri}...")
async with websockets.connect(self.ws_uri) as ws:
self.websocket = ws
print("Connected to WebSocket server")
# 保持连接,直到断开或程序停止
await self.websocket.wait_closed()
print("WebSocket connection closed, reconnecting...")
except Exception as e:
print(f"WebSocket connection error: {e}")
await asyncio.sleep(5)# 等待5秒后重连
async def run(self):
"""启动桥接服务"""
self.running = True
# 启动WebSocket客户端(自动重连)
ws_task = asyncio.create_task(self.connect_websocket())
# 启动UDP服务器
udp_task = asyncio.create_task(self.handle_udp_messages())
# 等待任务完成(理论上不会结束,除非手动停止)
await asyncio.gather(ws_task, udp_task)
def stop(self):
"""停止服务"""
self.running = False
if self.udp_socket:
self.udp_socket.close()
if self.websocket:
asyncio.create_task(self.websocket.close())
async def main():
bridge = UDPToWebSocketClient(
udp_host='0.0.0.0',# 监听所有网络接口
udp_port=9999, # UDP监听端口
ws_uri='ws://localhost:8765'# WebSocket服务器地址
)
try:
await bridge.run()
except KeyboardInterrupt:
print("Shutting down...")
bridge.stop()
if __name__ == '__main__':
asyncio.run(main()) 我不是专业的,单纯想问一下,为啥要用C#?Java没有现实这功能的方法?
—— 来自 S1Fun WilhelmG 发表于 2025-7-28 20:10
我不是专业的,单纯想问一下,为啥要用C#?Java没有现实这功能的方法?
—— 来自 S1Fun ...
集中技术栈可以少装环境,保证系统“轻量”
页:
[1]