Node.js 项目中对接 DeepSeek AI API,实现智能对话功能

文章类型:实战

发布者:hp

发布时间:2026-07-04

一、前言

本文详细介绍如何在 Node.js 项目中对接 DeepSeek AI API,实现智能对话功能。通过本指南,你将学会构建一个完整的 AI 对话系统,包括流式响应、对话管理、历史记录查询等核心功能。

二、技术栈

  • Node.js + Express: 后端框架
  • OpenAI SDK: 使用 openai npm 包对接 DeepSeek
  • MySQL: 数据存储
  • SSE (Server-Sent Events): 流式响应

三、环境准备

1. 安装依赖

npm install openai express mysql

2. 获取 API Key

访问 DeepSeek 官网注册并获取 API Key。

四、初始化 DeepSeek 客户端

const OpenAI = require('openai');

const client = new OpenAI({
    baseURL: 'https://api.deepseek.com',
    apiKey: 'your-api-key-here'
});

关键点:

  • 使用 OpenAI SDK,但指定 baseURL 为 DeepSeek 的 API 地址
  • DeepSeek API 与 OpenAI 接口兼容,可以无缝切换

五、核心功能实现

1. AI 问答接口(流式返回)

接口设计

POST /ai/question
Content-Type: application/json

{
  "question": "什么是 Node.js?",
  "history": ["你好", "我想学习编程"]
}

完整代码实现

router.post('/ai/question', async (req, res) => {
    const { question, history } = req.body;
    
    // 1. 参数验证
    if (!question || question.trim() === '') {
        return res.json({
            code: 400,
            message: '问题不能为空',
            data: null
        });
    }

    try {
        // 2. 设置 SSE 响应头
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Connection', 'keep-alive');
        res.setHeader('Access-Control-Allow-Origin', '*');
        
        // 3. 构建消息格式
        const messages = [];

        // 处理历史记录
        if (Array.isArray(history)) {
            history.forEach((item) => {
                if (typeof item === 'string' && item.trim() !== '') {
                    messages.push({ role: 'user', content: item });
                } else if (item && typeof item === 'object' && item.content && item.role) {
                    const role = item.role === 'assistant' ? 'assistant' : 'user';
                    messages.push({ role, content: item.content });
                }
            });
        }

        // 添加当前问题
        messages.push({ role: "user", content: question });
        
        // 4. 调用 DeepSeek API
        const stream = await client.chat.completions.create({
            model: 'deepseek-reasoner', // 推理模型
            messages: messages,
            stream: true,
        });

        // 5. 处理流式响应
        for await (const chunk of stream) {
            const delta = chunk.choices[0]?.delta;
            if (delta) {
                // 思考过程
                const reasoning = delta.reasoning_content || '';
                // 回答内容
                const content = delta.content || '';
                
                if (reasoning || content) {
                    res.write(\`data: \${JSON.stringify({ 
                        code: 200,
                        data: { reasoning, content, done: false }
                    })}\\n\\n\`);
                }
            }
        }
        
        // 6. 发送结束标记
        res.write(\`data: \${JSON.stringify({ 
            code: 200,
            message: '回答完成',
            data: { done: true }
        })}\\n\\n\`);
        res.end();
        
    } catch (error) {
        console.error('调用 DeepSeek API 失败:', error);
        
        res.write(\`data: \${JSON.stringify({
            code: 500,
            message: 'AI服务调用失败:' + error.message,
            data: { error: error.message, done: true }
        })}\\n\\n\`);
        res.end();
    }
});

核心亮点

1. deepseek-reasoner 模型特点

  • 支持推理模式,返回思考过程(reasoning_content)
  • 不支持 system 角色消息,只支持 user 和 assistant
  • 适合需要展示 AI 思考过程的场景

2. 历史记录处理

支持两种格式:

// 格式1: 字符串数组
["你好", "我想学习编程"]

// 格式2: 对象数组
[
  { role: "user", content: "你好" },
  { role: "assistant", content: "你好!有什么可以帮助你的吗?" }
]

3. SSE 流式响应

  • 实时返回 AI 生成的内容,用户体验更好
  • 前端可以逐字显示,类似打字机效果
  • 格式: data: {JSON对象}\n\n

2. 对话存储接口

数据库设计

CREATE TABLE \`chat\` (
  \`id\` int NOT NULL AUTO_INCREMENT,
  \`question\` text NOT NULL COMMENT '用户问题',
  \`answer\` text COMMENT 'AI回答',
  \`parentid\` int DEFAULT NULL COMMENT '对话组ID',
  \`createtime\` bigint NOT NULL COMMENT '创建时间戳',
  \`author\` varchar(50) DEFAULT NULL COMMENT '用户名',
  \`platform\` varchar(20) DEFAULT NULL COMMENT '平台',
  \`ip\` varchar(50) DEFAULT NULL COMMENT 'IP地址',
  \`isdisable\` tinyint DEFAULT 0 COMMENT '是否禁用',
  PRIMARY KEY (\`id\`),
  KEY \`idx_parentid\` (\`parentid\`),
  KEY \`idx_author\` (\`author\`),
  KEY \`idx_createtime\` (\`createtime\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

parentid 设计理念

parentid 用于标识对话组:

  • 新对话: parentid = 自身 id(根节点)
  • 继续对话: parentid = 第一条消息的 id

示例:

对话1:
  id=1, question="你好", parentid=1  (根节点)
  id=2, question="介绍 Node.js", parentid=1  (属于对话1)
  id=3, question="如何安装", parentid=1  (属于对话1)

对话2:
  id=4, question="Python好学吗", parentid=4  (新对话根节点)
  id=5, question="推荐学习路径", parentid=4  (属于对话2)

代码实现

router.post('/ai/save/conversation', async (req, res) => {
    try {
        const { question, history, answer, parentid, author, platform } = req.body;
        
        // 获取IP
        const ip = req.headers['x-forwarded-for'] || req.connection.remoteAddress;
        
        // 参数验证
        if (!question || question.trim() === '') {
            return res.json({
                code: 400,
                message: '问题不能为空',
                data: null
            });
        }
        
        // 获取作者
        const authorName = author || getUserFromReq(req) || 'unknown';
        const createtime = Date.now();
        
        // 准备插入数据(过滤 emoji)
        let insertValue = {
            question: removeEmoji(question.trim()),
            answer: removeEmoji(answer || ''), 
            createtime: createtime,
            author: authorName,
            platform: platform,
            ip: ip,
            isdisable: 0,
        };
        
        // 判断是否为新对话
        const historyLength = Array.isArray(history) ? history.length : 0;
        const isNewConversation = historyLength === 1 || historyLength === 0;
        
        if (isNewConversation) {
            // 新对话:先插入,再更新 parentid 为自身 id
            insertValue.parentid = null;
            
            const sql = 'INSERT INTO chat SET ?';
            db.query(sql, insertValue, (error, results) => {
                if (error) {
                    return res.json({
                        code: 500,
                        message: '存储失败:' + error.message,
                        data: null
                    });
                }
                
                const insertId = results.insertId;
                
                // 更新 parentid 为自己的 id
                const updateSql = 'UPDATE chat SET parentid = ? WHERE id = ?';
                db.query(updateSql, [insertId, insertId], () => {
                    return res.json({
                        code: 200,
                        message: '存储成功',
                        data: { id: insertId, parentid: insertId }
                    });
                });
            });
        } else {
            // 继续对话:直接使用传递的 parentid
            if (!parentid) {
                return res.json({
                    code: 400,
                    message: '继续对话需要提供 parentid',
                    data: null
                });
            }
            
            insertValue.parentid = parentid;
            
            const sql = 'INSERT INTO chat SET ?';
            db.query(sql, insertValue, (error, results) => {
                if (error) {
                    return res.json({
                        code: 500,
                        message: '存储失败:' + error.message,
                        data: null
                    });
                }
                
                return res.json({
                    code: 200,
                    message: '存储成功',
                    data: { id: results.insertId, parentid: parentid }
                });
            });
        }
        
    } catch (error) {
        return res.json({
            code: 500,
            message: '存储失败:' + error.message,
            data: null
        });
    }
});

3. 历史记录查询

接口设计

POST /ai/history

{
  "page": 1,
  "pageSize": 10,
  "author": "username",  // 可选
  "question": "关键词",   // 可选
  "startTime": "2024-01-01",  // 可选
  "endTime": "2024-12-31"     // 可选
}

核心查询逻辑

每个 parentid 只取第一条记录(去重),然后为每个对话组查询完整对话链:

// 核心:每个 parentid 只取第一条记录(去重)
const baseJoinSql = \`
    FROM chat c
    INNER JOIN (
        SELECT parentid, MIN(id) as min_id
        FROM chat
        GROUP BY parentid
    ) t ON c.id = t.min_id
\`;

// 查询总数
const countSql = \`
    SELECT COUNT(*) as total
    FROM (
        SELECT c.parentid
        \${baseJoinSql}
        \${filterSql}
    ) temp
\`;

// 查询列表
const listSql = \`
    SELECT c.id, c.question, c.answer, c.createtime, c.author, c.parentid, c.platform
    \${baseJoinSql}
    \${filterSql}
    ORDER BY c.createtime DESC 
    LIMIT ?, ?
\`;

// 为每个对话组查询完整对话链
const listWithChildren = await Promise.all(list.map(async (item) => {
    const childrenSql = \`SELECT id, question, answer, createtime, author, parentid 
                       FROM chat 
                       WHERE parentid = ? 
                       ORDER BY createtime ASC\`;
    
    return new Promise((resolve) => {
        db.query(childrenSql, [item.parentid], (err, childrenData) => {
            resolve({
                ...item,
                childrenlist: err ? [] : (childrenData || [])
            });
        });
    });
}));

查询优化要点

  • 去重查询: 使用 MIN(id) 确保每个 parentid 只返回第一条记录
  • 嵌套查询: 为每个对话组查询完整对话链(childrenlist)
  • 并发执行: 使用 Promise.all 同时执行总数和列表查询
  • 灵活过滤: 支持作者、关键词、时间范围多维度过滤




六、最佳实践

1. API Key 安全管理

// 使用环境变量
require('dotenv').config();

const client = new OpenAI({
    baseURL: 'https://api.deepseek.com',
    apiKey: process.env.DEEPSEEK_API_KEY
});

// .env 文件
DEEPSEEK_API_KEY=sk-xxxxxxxxxxxxxxxx

2. 请求限流

const rateLimit = require('express-rate-limit');

const limiter = rateLimit({
    windowMs: 60 * 1000,  // 1分钟
    max: 10,              // 最多10个请求
    message: '请求过于频繁,请稍后再试'
});

router.post('/ai/question', limiter, async (req, res) => {
    // ...
});

3. 错误重试机制

async function callDeepSeekWithRetry(messages, maxRetries = 3) {
    for (let i = 0; i < maxRetries; i++) {
        try {
            const stream = await client.chat.completions.create({
                model: 'deepseek-reasoner',
                messages: messages,
                stream: true,
            });
            return stream;
        } catch (error) {
            if (i === maxRetries - 1) throw error;
            
            // 等待后重试
            await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
        }
    }
}

4. 数据库连接池

const mysql = require('mysql');

const pool = mysql.createPool({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'chatdb',
    connectionLimit: 10  // 连接池大小
});

// 使用连接池
pool.query(sql, params, (error, results) => {
    // ...
});

5. 异步改造(使用 async/await)

const util = require('util');
const query = util.promisify(pool.query).bind(pool);

router.post('/ai/save/conversation', async (req, res) => {
    try {
        const { question, answer } = req.body;
        
        const insertValue = {
            question: removeEmoji(question),
            answer: removeEmoji(answer),
            createtime: Date.now()
        };
        
        const result = await query('INSERT INTO chat SET ?', insertValue);
        
        return res.json({
            code: 200,
            message: '存储成功',
            data: { id: result.insertId }
        });
        
    } catch (error) {
        return res.json({
            code: 500,
            message: '存储失败:' + error.message,
            data: null
        });
    }
});

七、常见问题 FAQ

Q1: 如何切换不同的 DeepSeek 模型?

// 使用 deepseek-chat(普通对话模型)
const stream = await client.chat.completions.create({
    model: 'deepseek-chat',  // 不返回思考过程
    messages: messages,
    stream: true,
});

// 使用 deepseek-reasoner(推理模型)
const stream = await client.chat.completions.create({
    model: 'deepseek-reasoner',  // 返回思考过程
    messages: messages,
    stream: true,
});

Q2: 如何处理长对话历史?

// 限制历史记录长度
function truncateHistory(history, maxLength = 10) {
    if (history.length <= maxLength) {
        return history;
    }
    // 保留最近的 N 条记录
    return history.slice(-maxLength);
}

Q3: 如何估算 API 成本?

// 统计 token 使用量
const response = await client.chat.completions.create({
    model: 'deepseek-reasoner',
    messages: messages,
    stream: false,  // 非流式可以获取 token 统计
});

console.log('Token 使用量:', {
    prompt_tokens: response.usage.prompt_tokens,
    completion_tokens: response.usage.completion_tokens,
    total_tokens: response.usage.total_tokens
});

Q4: SSE 连接断开怎么办?

// 前端自动重连
function connectSSE(url) {
    let eventSource;
    
    function connect() {
        eventSource = new EventSource(url);
        
        eventSource.onopen = () => {
            console.log('连接成功');
        };
        
        eventSource.onerror = () => {
            console.log('连接断开,3秒后重连...');
            eventSource.close();
            setTimeout(connect, 3000);
        };
        
        eventSource.onmessage = (event) => {
            // 处理消息
        };
    }
    
    connect();
}

八、总结

本文介绍了 Node.js 对接 DeepSeek API 的完整方案,包括:

  1. 核心功能: 流式对话、历史存储、记录查询
  2. 技术亮点: SSE 实时响应、parentid 对话管理、emoji 处理
  3. 工程实践: 错误处理、请求限流、日志记录
  4. 性能优化: 数据库索引、缓存策略、连接复用



下一篇暂无
评论
0条评论遵守法律,文明用语,共同建设文明评论区

暂无评论,快来发表第一条评论吧~