diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 682df84d..be978a38 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -346,6 +346,69 @@ def parse_args(): default="", help="Workflow description, used when publishing the workflow", ) + + # package subcommand: 社区设备包 inspect / upload + package_parser = subparsers.add_parser( + "package", + aliases=["pkg"], + help="Community device package tools: inspect / upload / install", + ) + package_actions = package_parser.add_subparsers( + title="package actions", dest="package_action" + ) + for action_name in ("inspect", "upload"): + action_parser = package_actions.add_parser( + action_name, + help=( + "Scan package dir and generate package_info/archive (local only)" + if action_name == "inspect" + else "Inspect then upload archive + package_info to backend /lab/resource" + ), + ) + action_parser.add_argument( + "--path", + dest="package_path", + type=str, + required=True, + help="Path to the community device package directory (contains pyproject.toml)", + ) + action_parser.add_argument( + "--namespace", + type=str, + default=None, + help="Class namespace, e.g. community.acme; defaults to community.", + ) + action_parser.add_argument( + "--out", + type=str, + default=None, + help="Output dir for archive/package_info.json (default: /../dist)", + ) + if action_name == "upload": + action_parser.add_argument( + "--download-url", + dest="download_url", + type=str, + default="", + help="Explicit reachable archive URL (skips OSS upload; handy for local static server)", + ) + + # install:开发者本地调试入口 + install_parser = package_actions.add_parser( + "install", + help="Install a pip spec / git URL locally (uv pip > pip), then scan @device IDs", + ) + install_parser.add_argument( + "install_spec", + type=str, + help="pip spec (name==version / name) or git URL (git+https://...)", + ) + install_parser.add_argument( + "--no-inspect", + dest="no_inspect", + action="store_true", + help="Skip post-install @device scan / device listing", + ) return parser @@ -509,6 +572,25 @@ def main(): print_status("传入了sk参数,优先采用传入参数!", "info") BasicConfig.working_dir = working_dir + # package 子命令:在配置/鉴权就绪后尽早处理,不进入设备 bootstrap + if args_dict.get("command") in ("package", "pkg"): + from unilabos.app.package_cli import PackageCLIError, cmd_package + + package_http_client = None + if args_dict.get("package_action") == "upload": + if not (BasicConfig.ak and BasicConfig.sk): + print_status("package upload 需要 --ak/--sk 鉴权信息", "error") + os._exit(1) + from unilabos.app.web import http_client as _http_client_for_package + + package_http_client = _http_client_for_package + try: + cmd_package(args_dict, http_client=package_http_client) + except PackageCLIError as exc: + print_status(str(exc), "error") + os._exit(1) + return + workflow_upload = args_dict.get("command") in ("workflow_upload", "wf") # 使用远程资源启动 diff --git a/unilabos/app/package_cli.py b/unilabos/app/package_cli.py new file mode 100644 index 00000000..cdcd2642 --- /dev/null +++ b/unilabos/app/package_cli.py @@ -0,0 +1,726 @@ +""" +社区设备包 CLI:inspect / upload / install +""" + +import hashlib +import json +import os +import re +import subprocess +import sys +import tarfile +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from unilabos.utils import logger +from unilabos.utils.banner_print import print_status + +COMMUNITY_PREFIX = "community." +DEFAULT_SOURCE_TYPE = "community" +ARCHIVE_EXCLUDE_DIRS = { + "__pycache__", + ".git", + ".idea", + ".vscode", + "dist", + "build", + ".pytest_cache", + "unilabos_data", + ".venv", + "venv", + "node_modules", +} +ARCHIVE_EXCLUDE_SUFFIXES = {".pyc", ".pyo"} + + +class PackageCLIError(RuntimeError): + """package 子命令执行过程中的可预期错误。""" + + +def normalize_name(name: str) -> str: + """归一化包名:小写、连字符转下划线(与 Edge _normalize_package_dir_name 取向一致)。""" + return name.strip().lower().replace("-", "_") + + +def resolve_class_namespace(project_name: str, namespace: Optional[str]) -> str: + """确定 class_namespace:显式 --namespace 优先,否则 community.<归一化包名>。""" + if namespace: + ns = namespace.strip() + if not ns.startswith(COMMUNITY_PREFIX): + ns = COMMUNITY_PREFIX + ns + return ns + return COMMUNITY_PREFIX + normalize_name(project_name) + + +def read_pyproject(pkg_dir: Path) -> Dict[str, Any]: + """读取 pyproject.toml 的 [project] 表,返回 name/version/summary/license/homepage/dependencies 等。""" + pyproject_path = pkg_dir / "pyproject.toml" + if not pyproject_path.is_file(): + raise PackageCLIError(f"未找到 pyproject.toml:{pyproject_path}") + + data = _load_toml(pyproject_path) + project = data.get("project", {}) if isinstance(data, dict) else {} + if not isinstance(project, dict): + project = {} + + name = str(project.get("name") or "").strip() + if not name: + raise PackageCLIError("pyproject.toml [project].name 为空,无法生成 package_info") + version = str(project.get("version") or "0.0.0").strip() + + license_value = project.get("license") + if isinstance(license_value, dict): + license_str = str(license_value.get("text") or license_value.get("file") or "") + else: + license_str = str(license_value or "") + + urls = project.get("urls") if isinstance(project.get("urls"), dict) else {} + homepage = "" + for key in ("Homepage", "homepage", "Repository", "repository", "Source", "source"): + if urls.get(key): + homepage = str(urls[key]) + break + + dependencies = project.get("dependencies") + deps: List[str] = [str(d) for d in dependencies] if isinstance(dependencies, list) else [] + + return { + "name": name, + "version": version, + "summary": str(project.get("description") or ""), + "license": license_str, + "homepage": homepage, + "dependencies": deps, + } + + +def scan_package_devices(pkg_dir: Path) -> Dict[str, Dict[str, Any]]: + """纯 AST 扫描包目录下的 @device 注册表,返回 {device_id: meta}。 + """ + from unilabos.registry.ast_registry_scanner import scan_directory + + py_files = [ + f + for f in pkg_dir.rglob("*.py") + if not f.name.startswith("__") + and not (set(f.relative_to(pkg_dir).parts) & ARCHIVE_EXCLUDE_DIRS) + ] + executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="PackageInspect") + try: + result = scan_directory(pkg_dir, executor=executor, include_files=py_files) + finally: + executor.shutdown(wait=True) + devices = result.get("devices", {}) + return {did: meta for did, meta in devices.items() if isinstance(meta, dict)} + + +def read_registry_yaml_devices(pkg_dir: Path) -> Dict[str, Dict[str, Any]]: + """读取包目录下 registry.yaml/*.yaml 里的设备注册表条目,返回 {device_id: entry}。 + + community_drivers 标准布局(driver.py + registry.yaml + startup.json)使用 YAML 注册表, + 其条目天然含 class.action_value_mappings/schema,是最完整的 source_registry。 + 仅采纳 resource_type=device 或带 class.action_value_mappings 的条目。 + """ + try: + import yaml + except ModuleNotFoundError: + logger.warning("[package] 未安装 pyyaml,跳过 registry.yaml 读取") + return {} + + entries: Dict[str, Dict[str, Any]] = {} + for yaml_path in sorted(list(pkg_dir.glob("*.yaml")) + list(pkg_dir.glob("*.yml"))): + try: + data = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + except Exception as exc: + logger.warning(f"[package] 解析 {yaml_path} 失败: {exc}") + continue + if not isinstance(data, dict): + continue + for device_id, entry in data.items(): + if not isinstance(entry, dict): + continue + cls = entry.get("class") if isinstance(entry.get("class"), dict) else {} + is_device = entry.get("resource_type") == "device" or bool(cls.get("action_value_mappings")) + if is_device: + entries[str(device_id)] = entry + return entries + + +def build_archive(pkg_dir: Path, archive_path: Path) -> str: + """把包目录打包为 tar.gz,跳过缓存/版本控制目录,返回 "sha256:"。""" + archive_path.parent.mkdir(parents=True, exist_ok=True) + arc_root = pkg_dir.name + + def _filter(tarinfo: tarfile.TarInfo) -> Optional[tarfile.TarInfo]: + parts = set(Path(tarinfo.name).parts) + if parts & ARCHIVE_EXCLUDE_DIRS: + return None + if Path(tarinfo.name).suffix in ARCHIVE_EXCLUDE_SUFFIXES: + return None + return tarinfo + + with tarfile.open(archive_path, "w:gz") as tar: + tar.add(str(pkg_dir), arcname=arc_root, filter=_filter) + + return "sha256:" + _sha256_file(archive_path) + + +_PY_TO_JSON_SCHEMA_TYPE = { + "float": "number", + "int": "integer", + "str": "string", + "bool": "boolean", + "dict": "object", + "list": "array", + "Dict": "object", + "List": "array", + "Any": "string", +} + + +def _json_schema_type(py_type: str) -> str: + """把 Python 类型注解字符串归一化为 JSON Schema type(取裸类型名,未知回退 string)。""" + base = (py_type or "").strip().split("[")[0].split(".")[-1] + return _PY_TO_JSON_SCHEMA_TYPE.get(base, "string") + + +def build_action_value_mappings(actions: Dict[str, Any]) -> Dict[str, Any]: + """把 AST 扫描的原始 action(params/return_type)转换成前后端期望的 + """ + result: Dict[str, Any] = {} + for name, meta in actions.items(): + if not isinstance(meta, dict): + continue + params = meta.get("params") if isinstance(meta.get("params"), list) else [] + goal_props: Dict[str, Any] = {} + required: List[str] = [] + goal_default: Dict[str, Any] = {} + for param in params: + if not isinstance(param, dict): + continue + pname = str(param.get("name") or "").strip() + if not pname: + continue + goal_props[pname] = {"type": _json_schema_type(str(param.get("type", ""))), "title": pname} + if param.get("required"): + required.append(pname) + if param.get("default") is not None: + goal_default[pname] = param.get("default") + goal_schema: Dict[str, Any] = {"type": "object", "properties": goal_props} + if required: + goal_schema["required"] = required + action_args = meta.get("action_args") if isinstance(meta.get("action_args"), dict) else {} + action_type_raw = action_args.get("action_type") + action_type = "UniLabJsonCommand" + if isinstance(action_type_raw, str) and action_type_raw.strip(): + action_type = action_type_raw.strip().split(":")[-1].split(".")[-1] + entry: Dict[str, Any] = { + "type": action_type, + "goal": goal_schema, + "result": {"type": "object", "properties": {}}, + "feedback": {"type": "object", "properties": {}}, + "description": str(meta.get("docstring") or action_args.get("description") or ""), + } + if goal_default: + entry["goal_default"] = goal_default + result[name] = entry + return result + + +def build_resources(devices: Dict[str, Dict[str, Any]], package_info: Dict[str, Any]) -> List[Dict[str, Any]]: + """把扫描出的设备 meta 映射为 /lab/resource 的 resources 项,并附 resource 级 source_registry。""" + resources: List[Dict[str, Any]] = [] + for device_id, meta in devices.items(): + actions = meta.get("actions") if isinstance(meta.get("actions"), dict) else {} + action_value_mappings = build_action_value_mappings(actions) + status_props = meta.get("status_properties") if isinstance(meta.get("status_properties"), dict) else {} + handles = meta.get("handles") if isinstance(meta.get("handles"), list) else [] + + reg_class = { + "module": meta.get("module", ""), + "type": meta.get("device_type", "python"), + "action_value_mappings": action_value_mappings, + "status_types": status_props, + } + # source_registry:保存设备原始注册表,供后端 BuildEffectiveTemplate 读取 class.action_value_mappings + source_registry = { + "class": reg_class, + "handles": handles, + "device_id": device_id, + "version": meta.get("version", package_info.get("version", "")), + "description": meta.get("description", ""), + "display_name": meta.get("display_name", ""), + "icon": meta.get("icon", ""), + } + category = meta.get("category") if isinstance(meta.get("category"), list) else [] + resources.append( + { + "id": device_id, + "registry_type": "device", + "version": meta.get("version", package_info.get("version", "0.0.1")), + "description": meta.get("description", ""), + "icon": meta.get("icon", ""), + "class": reg_class, + "category": category, + "handles": _map_handles(handles), + "package_info": package_info, + "source_registry": source_registry, + } + ) + return resources + + +def build_resources_from_registry( + entries: Dict[str, Dict[str, Any]], + package_info: Dict[str, Any], +) -> List[Dict[str, Any]]: + """把 registry.yaml 设备条目映射为 /lab/resource 的 resources 项。 + + 条目本身已含 class.action_value_mappings/schema,直接作为 source_registry, + 后端 BuildEffectiveTemplate 可据此构造 effective_template。 + """ + resources: List[Dict[str, Any]] = [] + for device_id, entry in entries.items(): + cls = entry.get("class") if isinstance(entry.get("class"), dict) else {} + init_schema = entry.get("init_param_schema") if isinstance(entry.get("init_param_schema"), dict) else None + category = entry.get("category") or entry.get("tags") or [] + if isinstance(category, str): + category = [category] + resource: Dict[str, Any] = { + "id": device_id, + "registry_type": str(entry.get("resource_type", "device")), + "version": str(entry.get("version", package_info.get("version", "0.0.1"))), + "description": entry.get("description", ""), + "icon": entry.get("icon", ""), + "class": { + "module": cls.get("module", ""), + "type": cls.get("type", "python"), + "action_value_mappings": cls.get("action_value_mappings", {}), + "status_types": cls.get("status_types", {}), + }, + "handles": [], + "category": category if isinstance(category, list) else [], + "manufacturer": str(entry.get("manufacturer", "")), + "model": entry.get("model"), + "scene": entry.get("scene"), + "device_params": entry.get("device_params"), + "package_info": package_info, + # source_registry:直接保存 YAML 原始条目(含 class.action_value_mappings) + "source_registry": entry, + } + if init_schema is not None: + resource["init_param_schema"] = init_schema + resources.append(resource) + return resources + + +def build_package_info( + project: Dict[str, Any], + class_namespace: str, + sha256: str, + download_url: str = "", + oss_object_key: str = "", +) -> Dict[str, Any]: + """根据 pyproject 元信息 + 命名空间 + 归档指纹构造 package_info(后端/Edge 共同消费的字段)。""" + name = project["name"] + info: Dict[str, Any] = { + "name": name, + "version": project["version"], + "class_namespace": class_namespace, + "module_prefix": class_namespace.split(".")[0] if class_namespace else "community", + "normalized_name": normalize_name(name), + "source_type": DEFAULT_SOURCE_TYPE, + "install_spec": f"{name}=={project['version']}" if project.get("version") else name, + "summary": project.get("summary", ""), + "license": project.get("license", ""), + "homepage": project.get("homepage", ""), + "sha256": sha256, + "download_url": download_url, + } + if oss_object_key: + info["oss_object_key"] = oss_object_key + return info + + +def inspect_package( + path: str, + namespace: Optional[str] = None, + out_dir: Optional[str] = None, +) -> Dict[str, Any]: + """扫描并打包一个社区设备包,产出 package_info / resources / 归档。返回结果汇总。""" + pkg_dir = Path(path).resolve() + if not pkg_dir.is_dir(): + raise PackageCLIError(f"包目录不存在:{pkg_dir}") + + project = read_pyproject(pkg_dir) + class_namespace = resolve_class_namespace(project["name"], namespace) + + out_path = Path(out_dir).resolve() if out_dir else (pkg_dir.parent / "dist") + out_path.mkdir(parents=True, exist_ok=True) + archive_name = f"{normalize_name(project['name'])}-{project['version']}.tar.gz" + archive_path = out_path / archive_name + sha256 = build_archive(pkg_dir, archive_path) + + package_info = build_package_info(project, class_namespace, sha256) + + # 设备来源优先级:registry.yaml(含完整 action_value_mappings)> @device AST 扫描 + yaml_entries = read_registry_yaml_devices(pkg_dir) + if yaml_entries: + device_source = "registry.yaml" + device_ids = sorted(yaml_entries) + resources = build_resources_from_registry(yaml_entries, package_info) + else: + device_source = "@device AST" + ast_devices = scan_package_devices(pkg_dir) + device_ids = sorted(ast_devices) + resources = build_resources(ast_devices, package_info) + devices = {rid: None for rid in device_ids} + if not resources: + print_status(f"警告:{pkg_dir} 未发现 registry.yaml 或 @device 设备,仅生成 package_info", "warning") + + package_info_path = out_path / "package_info.json" + resources_path = out_path / "resources.json" + package_info_path.write_text(json.dumps(package_info, ensure_ascii=False, indent=2), encoding="utf-8") + resources_path.write_text(json.dumps(resources, ensure_ascii=False, indent=2), encoding="utf-8") + + print_status(f"package inspect 完成:{project['name']}@{project['version']}", "info") + print_status(f" class_namespace : {class_namespace}", "info") + print_status(f" 设备来源 : {device_source}", "info") + print_status(f" 设备数 : {len(resources)} ({', '.join(device_ids) or '无'})", "info") + print_status(f" 归档 : {archive_path} ({sha256})", "info") + print_status(f" package_info : {package_info_path}", "info") + print_status(f" resources : {resources_path}", "info") + + return { + "project": project, + "class_namespace": class_namespace, + "devices": devices, + "archive_path": str(archive_path), + "sha256": sha256, + "package_info": package_info, + "resources": resources, + "package_info_path": str(package_info_path), + "resources_path": str(resources_path), + } + + +def upload_package( + path: str, + http_client: Any, + namespace: Optional[str] = None, + out_dir: Optional[str] = None, + download_url: str = "", +) -> Dict[str, Any]: + """inspect → 上传归档(或用显式 download_url)→ 带顶层 package_info 调 /lab/resource。""" + if http_client is None: + raise PackageCLIError("upload 需要有效的 http_client(请确认已传 --ak/--sk)") + + result = inspect_package(path, namespace=namespace, out_dir=out_dir) + package_info: Dict[str, Any] = result["package_info"] + archive_path = result["archive_path"] + + final_url, object_key = _resolve_download_target(http_client, archive_path, download_url) + package_info["download_url"] = final_url + if object_key: + package_info["oss_object_key"] = object_key + + # 同步顶层 package_info 到每个 resource 的 package_info,确保 resource 级也带 download_url/sha256 + resources = result["resources"] + for item in resources: + item["package_info"] = package_info + + response = http_client.upload_package_resources(resources, package_info) + status = getattr(response, "status_code", None) + text = getattr(response, "text", "") + if status not in (200, 201): + raise PackageCLIError(f"上传 /lab/resource 失败:{status} {text}") + + print_status("package upload 完成,设备模板已落库 package_info + source_registry", "info") + print_status(f" download_url : {final_url or '(空,请确认 OSS 或 --download-url)'}", "info") + print_status(f" class_namespace : {package_info['class_namespace']}", "info") + print_status(" 现在可用含 community.* 节点的 graph 启动 Edge 触发 resolve/下载", "info") + + return { + "package_info": package_info, + "resources": resources, + "download_url": final_url, + "response_status": status, + } + + +def install_package(spec: str, run_inspect: bool = True) -> Dict[str, Any]: + """本地安装一个设备包:uv pip install 优先、回退 pip install, + """ + spec = (spec or "").strip() + if not spec: + raise PackageCLIError("缺少安装目标,用法:unilab package install ") + + installer = _run_pip_install(spec) + print_status(f"package install 完成:{spec}({installer})", "info") + + # PyPI 规格直接取名;本地目录/文件路径装完后从其 pyproject.toml 读分发名(git/URL 仍取不到)。 + dist_name = _spec_dist_name(spec) or _local_dist_name(spec) + device_ids: List[str] = [] + if run_inspect and dist_name: + device_ids = _installed_device_ids(dist_name) + + if device_ids: + print_status(f" 包内可用设备 : {', '.join(device_ids)}", "info") + elif dist_name: + print_status(f" 已安装分发 : {dist_name}(未扫描到 @device,可能非 Uni-Lab 设备包)", "info") + else: + print_status(" 已安装(git/URL 来源,无法确定分发名,跳过设备扫描)", "info") + + return {"spec": spec, "installer": installer, "dist_name": dist_name, "device_ids": device_ids} + + +def cmd_package(args_dict: Dict[str, Any], http_client: Any = None) -> None: + """package 子命令分发入口,由 main() 在配置/鉴权就绪后调用。""" + action = args_dict.get("package_action") + path = args_dict.get("package_path") + namespace = args_dict.get("namespace") + out_dir = args_dict.get("out") + + if not action: + raise PackageCLIError( + "缺少 package 子动作,请使用 `unilab package inspect|upload|install`" + ) + + if action == "install": + install_package( + args_dict.get("install_spec", "") or "", + run_inspect=not args_dict.get("no_inspect", False), + ) + return + + if not path: + raise PackageCLIError("缺少 --path(社区设备包目录)") + + if action == "inspect": + inspect_package(path, namespace=namespace, out_dir=out_dir) + elif action == "upload": + upload_package( + path, + http_client=http_client, + namespace=namespace, + out_dir=out_dir, + download_url=args_dict.get("download_url", "") or "", + ) + else: + raise PackageCLIError(f"未知 package 子动作:{action}") + + +# --- 内部工具 --- + + +def _run_pip_install(spec: str) -> str: + """优先 `uv pip install`、回退 `python -m pip install` 安装 spec,返回实际使用的安装器名。 + + 失败(含找不到 uv)时切下一个;全部失败抛 PackageCLIError 并带最后一次 stderr。 + """ + attempts: List[Tuple[str, List[str]]] = [ + ("uv pip install", ["uv", "pip", "install", spec]), + ("pip install", [sys.executable, "-m", "pip", "install", spec]), + ] + last_err = "" + for name, cmd in attempts: + print_status(f"尝试安装:{name} {spec}", "info") + try: + proc = subprocess.run(cmd, capture_output=True, text=True) + except FileNotFoundError: + continue # 未安装 uv 等,换下一个安装器 + if proc.returncode == 0: + return name + last_err = (proc.stderr or proc.stdout or "").strip() + raise PackageCLIError(f"安装失败:{spec}\n{last_err}") + + +def _spec_dist_name(spec: str) -> str: + """从 pip spec 取分发名;git/URL/本地路径无法在安装前可靠确定分发名,返回空串跳过设备扫描。""" + s = spec.strip() + if s.startswith(("git+", "http://", "https://", "file:", ".", "/")): + return "" + match = re.match(r"^([A-Za-z0-9][A-Za-z0-9._-]*)", s) + return match.group(1) if match else "" + + +def _local_dist_name(spec: str) -> str: + """本地目录/文件路径 spec:装完后从其 pyproject.toml 读分发名,用于补扫 @device。 + + git/URL 来源在安装前后都无法可靠读到本地 pyproject,返回空串跳过。 + """ + s = spec.strip() + if s.startswith(("git+", "http://", "https://")): + return "" + if s.startswith("file:"): + s = s[5:] + p = Path(s).expanduser() + if not p.exists(): + return "" + pkg_dir = p if p.is_dir() else p.parent + try: + return str(read_pyproject(pkg_dir).get("name") or "").strip() + except PackageCLIError: + return "" + + +def _installed_device_ids(dist_name: str) -> List[str]: + """对已安装分发的 top-level 模块做 @device AST 扫描,返回设备 ID 列表(失败返回空)。 """ + try: + from importlib.metadata import PackageNotFoundError, distribution + except Exception: + return [] + try: + dist = distribution(dist_name) + except PackageNotFoundError: + return [] + except Exception as exc: + logger.warning(f"[package] 读取已安装分发失败: {dist_name}, {exc}") + return [] + + top_modules: List[str] = [] + try: + top_text = dist.read_text("top_level.txt") or "" + top_modules = [line.strip() for line in top_text.splitlines() if line.strip()] + except Exception: + top_modules = [] + if not top_modules: + # 现代 wheel 可能不写 top_level.txt:从 RECORD/files 兜底推断顶层模块名。 + inferred: set[str] = set() + for entry in dist.files or []: + parts = entry.parts + if not parts: + continue + head = parts[0] + if head in {"..", "__pycache__"} or head.endswith((".dist-info", ".data")): + continue + if len(parts) == 1 and head.endswith(".py"): + inferred.add(head[:-3]) # 单文件模块 + elif len(parts) > 1 and "." not in head: + inferred.add(head) # 包目录 + top_modules = sorted(inferred) or [dist_name.replace("-", "_")] + + import importlib.util + + from unilabos.registry.ast_registry_scanner import scan_directory + + scan_files: List[Path] = [] + for module_name in top_modules: + try: + module_spec = importlib.util.find_spec(module_name) + except (ImportError, ValueError): + continue + if module_spec is None: + continue + if module_spec.submodule_search_locations: + for location in module_spec.submodule_search_locations: + loc_path = Path(location) + if not loc_path.is_dir(): + continue + scan_files.extend( + f + for f in loc_path.rglob("*.py") + if not f.name.startswith("__") + and not (set(f.relative_to(loc_path).parts) & ARCHIVE_EXCLUDE_DIRS) + ) + elif module_spec.origin and module_spec.origin.endswith(".py"): + scan_files.append(Path(module_spec.origin)) + + if not scan_files: + return [] + + executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="PackageInstallScan") + try: + result = scan_directory(scan_files[0].parent, executor=executor, include_files=scan_files) + finally: + executor.shutdown(wait=True) + devices = result.get("devices", {}) + return sorted(did for did, meta in devices.items() if isinstance(meta, dict)) + + +def _resolve_download_target(http_client: Any, archive_path: str, download_url: str) -> Tuple[str, str]: + """确定归档的可达地址:显式 download_url 优先(本地调试可指向静态服务),否则走 /lab/storage/token 预签名直传 OSS。""" + if download_url: + print_status(f"使用显式 download_url:{download_url}", "info") + return download_url, "" + + print_status(f"上传归档到 OSS(预签名直传):{archive_path}", "info") + try: + public_url, object_key = http_client.upload_file_to_oss(archive_path, scene="models") + except Exception as exc: + raise PackageCLIError( + f"归档预签名直传失败:{exc};可改用 --download-url 指向可达地址(本地静态服务或已有 OSS URL)" + ) from exc + if not public_url and not object_key: + raise PackageCLIError( + "OSS 直传未返回 public_url/object_key;可改用 --download-url 直接指定可达地址" + ) + return public_url, object_key + + +def _map_handles(handles: List[Any]) -> List[Dict[str, Any]]: + """把扫描出的 handles 列表映射为后端 RegHandle 友好结构(缺字段留空,不阻断上传)。""" + mapped: List[Dict[str, Any]] = [] + for handle in handles: + if isinstance(handle, dict): + mapped.append( + { + "data_key": str(handle.get("data_key", "")), + "data_source": str(handle.get("data_source", "")), + "data_type": str(handle.get("data_type", "")), + "description": str(handle.get("description", "")), + "handler_key": str(handle.get("handler_key", "")), + "io_type": str(handle.get("io_type", "")), + "label": str(handle.get("label", "")), + "side": str(handle.get("side", "")), + } + ) + return mapped + + +def _sha256_file(path: Path) -> str: + digest = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def _load_toml(path: Path) -> Dict[str, Any]: + """加载 toml:优先标准库 tomllib(3.11+),回退 tomli,再回退极简解析(仅取 [project] 标量)。""" + raw = path.read_bytes() + try: + import tomllib # type: ignore + + return tomllib.loads(raw.decode("utf-8")) + except ModuleNotFoundError: + pass + try: + import tomli # type: ignore + + return tomli.loads(raw.decode("utf-8")) + except ModuleNotFoundError: + logger.warning("[package] 未找到 tomllib/tomli,使用极简解析仅提取 [project] 标量字段") + return {"project": _minimal_project_parse(raw.decode("utf-8"))} + + +def _minimal_project_parse(text: str) -> Dict[str, str]: + """极简 fallback:仅解析 [project] 段内的 name/version/description 标量。""" + result: Dict[str, str] = {} + in_project = False + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if line.startswith("[") and line.endswith("]"): + in_project = line == "[project]" + continue + if not in_project or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + if key in {"name", "version", "description"}: + result[key] = value + return result diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index e9846bb3..95132fd7 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -6,7 +6,7 @@ import gzip import json import os -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Tuple from unilabos.utils.tools import fast_dumps as _fast_dumps, fast_dumps_pretty as _fast_dumps_pretty @@ -266,29 +266,43 @@ def resource_update(self, resources: List[Dict[str, Any]]) -> requests.Response: logger.error(f"添加物料失败: {response.text}") return response.json() - def upload_file(self, file_path: str, scene: str = "models") -> requests.Response: - """ - 上传文件到服务器 - - 使用multipart/form-data格式上传文件,类似curl -F "files=@filepath" - - Args: - file_path: 要上传的文件路径 - scene: 上传场景,可选值为"user"或"models",默认为"models" + def upload_file_to_oss(self, file_path: str, scene: str = "models") -> Tuple[str, str]: + filename = os.path.basename(file_path) + # 归档为 tar.gz;Content-Type 必须与签发 token 时一致,否则 OSS V1 验签 403 + content_type = "application/gzip" + token_resp = self._session.get( + f"{self.remote_addr}/lab/storage/token", + params={"scene": scene, "filename": filename, "content_type": content_type}, + headers={"Authorization": f"Lab {self.auth}"}, + timeout=30, + ) + if token_resp.status_code != 200: + raise RuntimeError(f"获取存储 token 失败:{token_resp.status_code} {token_resp.text}") + + payload = token_resp.json() + data = payload.get("data", payload) if isinstance(payload, dict) else {} + if not isinstance(data, dict): + data = {} + put_url = str(data.get("url") or "") + object_key = str(data.get("path") or "") + public_url = str(data.get("public_url") or "") + signed_content_type = str(data.get("content_type") or content_type) + if not put_url: + raise RuntimeError(f"存储 token 响应缺少预签名 url:{token_resp.text}") - Returns: - Response: API响应对象 - """ with open(file_path, "rb") as file: - files = {"files": file} - logger.info(f"上传文件: {file_path} 到 {scene}") - response = self._session.post( - f"{self.remote_addr}/api/account/file_upload/{scene}", - files=files, - headers={"Authorization": f"Lab {self.auth}"}, - timeout=30, # 上传文件可能需要更长的超时时间 - ) - return response + body = file.read() + logger.info(f"预签名直传 OSS: {file_path} -> {object_key or public_url}") + # 用裸 requests 直传,避免 session 默认的 Lab Authorization 头干扰 OSS URL 签名校验 + put_resp = requests.put( + put_url, + data=body, + headers={"Content-Type": signed_content_type}, + timeout=120, + ) + if put_resp.status_code not in (200, 201): + raise RuntimeError(f"OSS 直传失败:{put_resp.status_code} {put_resp.text}") + return public_url, object_key def resource_registry( self, registry_data: Dict[str, Any] | List[Dict[str, Any]], tag: str = "registry", @@ -346,6 +360,53 @@ def resource_registry( logger.error(f"注册资源失败: {response.text}") return response + def upload_package_resources( + self, + resources: List[Dict[str, Any]], + package_info: Dict[str, Any], + ) -> requests.Response: + """ + 上传社区设备包的 resources(带顶层 package_info)到 /lab/resource。 + + 与 resource_registry 同端点/同压缩方式,区别是请求体包一层 + {"package_info": <顶层>, "resources": [...]},让后端 resolvePackageInfo + 将 package_info(含 class_namespace/download_url/sha256)落到每个设备模板。 + """ + body = {"package_info": package_info, "resources": resources} + json_bytes = _fast_dumps(body) + + req_path = os.path.join(BasicConfig.working_dir, "req_package_upload.json") + try: + os.makedirs(BasicConfig.working_dir, exist_ok=True) + with open(req_path, "wb") as f: + f.write(_fast_dumps_pretty(body)) + except Exception as e: + logger.warning(f"保存包上传请求数据失败: {e}") + + compressed_body = gzip.compress(json_bytes) + headers = { + "Authorization": f"Lab {self.auth}", + "Content-Type": "application/json", + "Content-Encoding": "gzip", + } + response = self._session.post( + f"{self.remote_addr}/lab/resource", + data=compressed_body, + headers=headers, + timeout=60, + ) + + res_path = os.path.join(BasicConfig.working_dir, "res_package_upload.json") + try: + with open(res_path, "w", encoding="utf-8") as f: + f.write(f"{response.status_code}\n{response.text}") + except Exception as e: + logger.warning(f"保存包上传响应数据失败: {e}") + + if response.status_code not in [200, 201]: + logger.error(f"上传社区设备包失败: {response.status_code}, {response.text}") + return response + def request_startup_json(self) -> Optional[Dict[str, Any]]: """ 请求启动配置 diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 75677b4f..aba59b86 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -127,6 +127,9 @@ def setup(self, devices_dirs=None, upload_registry=False, complete_registry=Fals # 1. AST 静态扫描 (快速, 无需 import) self._run_ast_scan(devices_dirs, upload_registry=upload_registry, external_only=external_only) + # 社区包根目录可只放一个 registry.yaml 声明设备(device_id -> 条目), + self._load_community_device_registries(devices_dirs) + # 2. Host node 内置设备 self._setup_host_node() @@ -2182,6 +2185,53 @@ def load_device_types(self, path: os.PathLike, complete_registry: bool = False): f"(耗时 {time.time() - t0:.2f}s){extra}" ) + def _load_community_device_registries(self, devices_dirs=None): + """加载社区设备包根目录下的 registry.yaml(device_id -> 条目)。 + """ + if not devices_dirs: + return + + loaded_total = 0 + for d in devices_dirs: + d_path = Path(d).resolve() + if not d_path.is_dir(): + continue + reg_file = None + for name in ("registry.yaml", "registry.yml"): + candidate = d_path / name + if candidate.is_file(): + reg_file = candidate + break + if reg_file is None: + continue + + try: + data, _complete_data, is_valid, device_ids = self._load_single_device_file( + reg_file, complete_registry=False + ) + except Exception as e: + logger.warning(f"[UniLab Registry] 社区包 registry.yaml 加载失败: {reg_file}, 错误: {e}") + continue + if not is_valid: + continue + + runtime_data = {did: data[did] for did in device_ids if did in data} + for cfg in runtime_data.values(): + # _load_single_device_file 会按 file.stem 追加分类,这里去掉无意义的 "registry" + category = cfg.get("category") + if isinstance(category, list) and reg_file.stem in category and len(category) > 1: + category.remove(reg_file.stem) + if runtime_data: + self.device_type_registry.update(runtime_data) + loaded_total += len(runtime_data) + logger.info( + f"[UniLab Registry] 社区包 registry.yaml 设备加载: {reg_file} -> " + f"{', '.join(sorted(runtime_data))}" + ) + + if loaded_total: + logger.info(f"[UniLab Registry] 社区包 registry.yaml 设备加载完成: 共 {loaded_total} 个") + # ------------------------------------------------------------------ # 注册表信息输出 # ------------------------------------------------------------------ diff --git a/unilabos/utils/environment_check.py b/unilabos/utils/environment_check.py index 5dcff22f..b1aea9be 100644 --- a/unilabos/utils/environment_check.py +++ b/unilabos/utils/environment_check.py @@ -214,7 +214,10 @@ def install_requirements_txt(req_path: str | Path, label: str = "") -> bool: return True print_status(f"[{tag}] 缺失 {len(missing)} 个依赖: {', '.join(missing)}", "warning") - return _install_packages(missing, label=tag) + ok = _install_packages(missing, label=tag) + # 运行时新装的包必须刷新 import 缓存:进程启动时 FileFinder 已缓存各 site-packages + importlib.invalidate_caches() + return ok def check_device_package_requirements(devices_dirs: list[str]) -> bool: