JXWAFJXWAF
首页
安全模型服务
JXWAF
WebTDS
GitHub
首页
安全模型服务
JXWAF
WebTDS
GitHub
  • 安全模型服务

    • 产品介绍

产品介绍

基于多维稀疏注意力机制的大模型语义缓存系统,通过在线蒸馏获得大模型的安全检测能力,实现高并发、低成本、低幻觉的安全模型服务。目前已上线 Web 安全检测模型,可精准识别 SQL 注入、XSS、命令执行等Web 攻击,后续将陆续推出更多安全模型。

快速上手

1. 访问控制台

浏览器打开 https://model.jxwaf.com。首次使用需注册账号,强烈建议开启 OTP 双因素认证以增强账户安全。

2. 配置 AI 模型

进入「AI 模型配置」页面:

  • 选择模型(如 deepseek-v4-flash)
  • 填入 DeepSeek API Key,支持一键验证有效性
  • 按需开关「思考模式」:开启时 AI 执行深度推理链,准确率更高;关闭时速度更快、成本更低

3. 生成 API Key

进入「API 密钥管理」页面创建 Key,输入标识名后系统生成唯一密钥。

4. 第一次调用

curl -X POST https://model.jxwaf.com/jxwaf_model/web_attack_check \
  -H "Content-Type: application/json" \
  -H "jxwaf-model-auth: <你的API Key>" \
  -d "id=1' OR '1'='1"

首次提交会返回 unknown,等待大约 5秒 后重试即可获取分析结果(开启思考模式则1分钟后能有结果) 。


控制台功能

模块说明
AI 模型配置配置大模型提供商、API Key 及思考模式
API 密钥管理创建、查看、删除调用接口所需的 API Key
AI模型蒸馏记录查询AI模型详细蒸馏记录
用量统计近 30 天的总请求数、缓存命中数、AI 分析数、缓存命中率及每日趋势图。缓存命中率越高,AI 调用成本越低

攻击检测 API

基本信息

  • 方法:POST
  • 地址:/jxwaf_model/web_attack_check
  • 认证:请求头 jxwaf-model-auth 传入 API Key

请求示例

curl -X POST https://model.jxwaf.com/jxwaf_model/web_attack_check \
  -H "Content-Type: application/json" \
  -H "jxwaf-model-auth: <你的API Key>" \
  -d "id=1' OR '1'='1"

响应示例

判定为攻击:

{
  "status": "ok",
  "result": "attack",
  "token": "a1b2c3d4e5f6...",
  "ai_model": "deepseek-v4-flash",
  "attack_type": ["sql", "rce"]
}

判定为安全:

{
  "status": "ok",
  "result": "safe",
  "token": "a1b2c3d4e5f6...",
  "ai_model": ""
}

待分析(首次提交):

{
  "status": "ok",
  "result": "unknown",
  "token": "a1b2c3d4e5f6...",
  "ai_model": ""
}

认证失败:

{
  "status": "error",
  "result": "auth_fail"
}

响应字段说明

字段类型描述
statusstringok – 请求成功;error – 请求失败
resultstringsafe – 安全;attack – 攻击;unknown – 待分析(后台异步处理中);auth_fail – 认证失败
tokenstring请求内容的唯一哈希标识,可用于日志追踪或二次查询
ai_modelstring返回 attack/safe 时,展示所用 AI 模型名称;否则为空
attack_typearray仅在 result=attack 时返回,列出命中的攻击类型

攻击类型

平台当前支持 7 类 Web 攻击:

标识名称典型特征
sqlSQL 注入' OR '1'='1、UNION SELECT
xss跨站脚本<script>alert(1)</script>
rce远程命令执行; ls、$(whoami)
code_exec任意代码执行SSTI、OGNL/SpEL 表达式
path_traversal路径遍历../../etc/passwd
xxeXML 外部实体注入利用 XML 解析器读取文件或发起 SSRF
other其他攻击SSRF、NoSQL 注入、CRLF 注入等

最佳实践:收到 unknown 后,建议客户端间隔 5 秒重试 1 次。重试不产生额外 AI 调用费用。


Python 代码示例

以下示例演示如何对接 Web 安全检测 API,支持命令行参数、文件输入及自动等待分析完成的轮询机制。

依赖安装:

pip3 install requests

客户端封装

#!/usr/bin/env python3
"""
JXWAF 安全模型服务 Python 客户端
支持单条/批量检测、自动轮询等待分析结果
"""
import os
import sys
import json
import time
import argparse
from pathlib import Path

import requests


class JxwafClient:
    def __init__(self, api_key=None, base_url=None):
        self.api_key = api_key or os.environ.get("JXWAF_API_KEY", "")
        self.base_url = (base_url or os.environ.get("JXWAF_BASE_URL", "")).rstrip("/")
        if not self.api_key:
            raise ValueError("API Key 未设置,请通过参数或环境变量 JXWAF_API_KEY 配置")
        if not self.base_url:
            raise ValueError("服务地址未设置,请通过参数或环境变量 JXWAF_BASE_URL 配置")
        self.session = requests.Session()
        self.session.headers.update({
            "Content-Type": "application/json",
            "jxwaf-model-auth": self.api_key,
        })

    def _submit(self, payload: str) -> dict:
        resp = self.session.post(
            f"{self.base_url}/jxwaf_model/web_attack_check",
            data=payload,
            timeout=120,
        )
        resp.raise_for_status()
        return resp.json()

    def check(self, payload: str, poll_interval: int = 2, max_wait: int = 300) -> dict:
        """提交检测并等待分析完成。"""
        submitted = False
        deadline = time.time() + max_wait

        while time.time() < deadline:
            result = self._submit(payload)
            if result.get("status") == "error":
                return result

            verdict = result.get("result", "")
            if verdict in ("safe", "attack"):
                return result

            # 首次提交后,后续使用 token 查询加速命中缓存
            if not submitted:
                submitted = True
                payload = json.dumps({"token": result.get("token", "")})

            time.sleep(poll_interval)

        return result

    def check_batch(self, items: list, poll_interval: int = 2, max_wait: int = 300, verbose: bool = False) -> list:
        """批量检测,支持实时进度输出。items 可以是字符串列表或 (标签, 数据) 元组列表。"""
        results = []
        for i, item in enumerate(items):
            if isinstance(item, (list, tuple)):
                label, payload = item
            else:
                label, payload = None, item

            if verbose:
                tag = f"[{label}] " if label else ""
                print(f"[{i+1}/{len(items)}] {tag}提交检测...", end=" ", flush=True)

            r = self.check(str(payload), poll_interval, max_wait)
            results.append(r)

            if verbose:
                verdict = r.get("result", "?")
                attack_type = r.get("attack_type", [])
                if verdict == "attack":
                    print(f"⚠ 攻击 ({', '.join(attack_type)})")
                elif verdict == "safe":
                    print("✓ 安全")
                elif verdict == "unknown":
                    print("⏳ 超时未完成")
                else:
                    print(f"✗ {verdict}")

        return results


def load_payloads(source: str) -> list[str]:
    """从文件加载待检测数据,每行一条纯文本请求。以 '#' 开头的行视为注释,空行自动忽略。"""
    path = Path(source)
    if not path.exists():
        raise FileNotFoundError(f"文件不存在: {source}")
    payloads = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            payloads.append(line)
    return payloads


def format_result(result: dict) -> str:
    """格式化检测结果为可读字符串。"""
    verdict = result.get("result", "?")
    token = result.get("token", "")
    model = result.get("ai_model", "")
    attack_type = result.get("attack_type", [])
    if verdict == "attack":
        return f"⚠ 攻击 | 类型: {', '.join(attack_type)} | 模型: {model} | Token: {token}"
    elif verdict == "safe":
        return f"✓ 安全 | 模型: {model} | Token: {token}"
    elif verdict == "unknown":
        return f"⏳ 待分析 | Token: {token}"
    else:
        return f"✗ {verdict} | Token: {token}"


def main():
    parser = argparse.ArgumentParser(description="JXWAF安全模型服务 - Web安全检测客户端")
    parser.add_argument("payload", nargs="?", help="待检测数据(也可从 stdin 读取)")
    parser.add_argument("-k", "--api-key", default=os.environ.get("JXWAF_API_KEY", ""),
                        help="API Key(可通过环境变量 JXWAF_API_KEY 设置)")
    parser.add_argument("-s", "--server", default=os.environ.get("JXWAF_BASE_URL", ""),
                        help="服务地址(可通过环境变量 JXWAF_BASE_URL 设置)")
    parser.add_argument("-f", "--file", help="从文件读取批量数据(每行一条纯文本请求)")
    parser.add_argument("-o", "--output", help="结果输出文件(JSON 格式)")
    parser.add_argument("-i", "--interval", type=int, default=2, help="轮询间隔秒数(默认 2)")
    parser.add_argument("-w", "--max-wait", type=int, default=300, help="单条最大等待秒数(默认 300)")
    parser.add_argument("-v", "--verbose", action="store_true", help="打印详细过程")
    args = parser.parse_args()

    client = JxwafClient(api_key=args.api_key, base_url=args.server)

    # 批量检测模式
    if args.file:
        items = load_payloads(args.file)
        if not items:
            print("文件中没有待检测数据", file=sys.stderr)
            sys.exit(1)
        if args.verbose:
            print(f"从文件加载 {len(items)} 条数据\n")
        results = client.check_batch(items, args.interval, args.max_wait, verbose=args.verbose)

        print(f"\n===== 检测结果 ({len(results)} 条) =====")
        attack_count = sum(1 for r in results if r.get("result") == "attack")
        safe_count = sum(1 for r in results if r.get("result") == "safe")
        unknown_count = sum(1 for r in results if r.get("result") == "unknown")
        print(f"攻击: {attack_count}  安全: {safe_count}  待分析: {unknown_count}\n")

        for i, (item, r) in enumerate(zip(items, results)):
            # 纯文本模式下直接显示序号
            print(f"[{i+1}] " + format_result(r))

        if args.output:
            with open(args.output, "w", encoding="utf-8") as f:
                json.dump(results, f, ensure_ascii=False, indent=2)
            print(f"\n结果已保存至: {args.output}")
        return

    # 单条检测模式
    payload = args.payload
    if not payload:
        if not sys.stdin.isatty():
            payload = sys.stdin.read().strip()
        else:
            parser.print_help()
            sys.exit(1)

    r = client.check(payload, args.interval, args.max_wait)
    print(format_result(r))
    if args.output:
        with open(args.output, "w", encoding="utf-8") as f:
            json.dump(r, f, ensure_ascii=False, indent=2)


if __name__ == "__main__":
    main()

使用方式

1. 环境变量配置(推荐)

export JXWAF_API_KEY="你的API Key"
export JXWAF_BASE_URL="https://your-instance.com"

2. 单条检测

# 命令行直接传入
python3 jxwaf_client.py "id=1' OR '1'='1"

# 从 stdin 管道
echo "id=1 UNION SELECT username,password FROM users--" | python3 jxwaf_client.py

3. 批量检测(仅支持纯文本行)

准备数据文件 payloads.txt,每行一条请求内容,# 开头为注释:

# SQL 注入测试
id=1' OR '1'='1
# XSS 测试
<script>alert(1)</script>
# 路径遍历
../../etc/passwd
# 命令注入
; ls -la /tmp

执行命令:

python3 jxwaf_client.py -f payloads.txt -v -o result.json

命令行参数一览

参数说明
payload待检测数据(位置参数,可从 stdin 读取)
-k, --api-keyAPI Key(优先级高于环境变量)
-s, --server服务地址(优先级高于环境变量)
-f, --file从文件读取批量数据(纯文本,每行一条请求)
-o, --output将结果输出为 JSON 文件
-i, --interval轮询间隔秒数(默认 2)
-w, --max-wait单条最大等待秒数(默认 300)
-v, --verbose打印实时进度

常见问题

Q: 首次调用返回 unknown 正常吗?
A: 正常。平台对未分析过的请求先返回 unknown,同时后台异步调用 AI 分析。间隔 10 秒重试即可获取结果;相同内容的后续请求将直接命中缓存。

Q: 如何降低 AI 调用成本?
A: 可在「AI 模型配置」中关闭思考模式,使用快速推理进一步节省费用。

Q: API Key 丢失了怎么办?
A: 请在控制台删除旧 Key 并创建新 Key。

Q: OTP 验证码错误如何解决?
A: TOTP 依赖精确的时间同步,请确保手机与网络时间保持一致。若仍无法恢复,请联系管理员。

Q: 支持哪些 AI 模型?
A: 当前支持 DeepSeek 平台,可选模型包括 deepseek-v4-flash(快速推理)和 deepseek-v4-pro(深度推理)。

Q: 检测准确率如何保证?
A: 平台采用 3 次独立分析 + 最多 2 轮一致性重试 的投票机制,所有分析结果完全一致才输出结论,有效降低大模型幻觉带来的误报和漏报。