WebSocket 与 SSE 总览
WebSocket 基础
- 定位:WebSocket 是一条在 HTTP 握手后升级的全双工连接,允许客户端与服务器在同一 TCP 通道上双向推送数据,省去了反复轮询。
- 握手流程:
- 客户端通过
Upgrade: websocket头发起 HTTP 请求; - 服务器响应
101 Switching Protocols,双方协商子协议、压缩等扩展; - 后续通信转为 WebSocket 帧,遵循
FIN/Opcode/Payload格式。 - 特点:状态常驻、头部极小、支持二进制/文本帧、可以按房间划分广播。
- 适用场景:IM/聊天室、协同编辑、实时仪表盘、在线游戏等需要低延迟双向通信的业务。
Node.js + Socket.IO 示例
socket.io 封装了握手、心跳、自动重连、回退轮询等细节,通过自带协议+服务端组件(socket.io)与客户端(socket.io-client)协作。
目录与依赖初始化
mkdir websocket-demo && cd websocket-demo
npm init -y
npm install socket.io express
服务端示例
// server.js
const express = require('express');
const http = require('http');
const { Server } = require('socket.io');
const app = express();
const server = http.createServer(app);
const io = new Server(server, {
cors: { origin: '*' },
pingInterval: 10000, // 心跳频率
});
io.on('connection', (socket) => {
console.log('client connected:', socket.id);
// 自定义事件:客户端发来 chat-message
socket.on('chat-message', (payload) => {
// 发给当前客户端的回执
socket.emit('chat-ack', { id: payload.id, status: 'OK' });
// 广播给所有其他客户端
socket.broadcast.emit('chat-message', {
...payload,
from: socket.id,
});
});
// 服务器主动广播
socket.on('join-room', (room) => {
socket.join(room);
io.to(room).emit('room-notify', {
room,
memberCount: io.sockets.adapter.rooms.get(room)?.size || 0,
});
});
socket.on('disconnect', (reason) => {
console.log('disconnect:', socket.id, reason);
});
});
server.listen(3000, () => {
console.log('socket.io server listening on 3000');
});
客户端示例
<!-- client.html -->
<script src="https://cdn.socket.io/4.7.4/socket.io.min.js"></script>
<script>
const socket = io('http://localhost:3000', { transports: ['websocket'] });
socket.on('connect', () => {
console.log('connected', socket.id);
socket.emit('join-room', 'general');
socket.emit('chat-message', { id: Date.now(), text: 'Hello everyone!' });
});
socket.on('chat-message', (msg) => {
console.log('[broadcast]', msg);
});
socket.on('room-notify', (data) => {
console.log(`[room ${data.room}] members:`, data.memberCount);
});
socket.on('disconnect', (reason) => {
console.log('connection closed:', reason);
});
</script>
自定义事件与广播策略
socket.on('<event>', handler):监听自定义事件,如chat-message、typing。socket.emit:只发给当前连接(常用于回执、私聊)。socket.broadcast.emit:发给除自己外的所有客户端,适合房间外的全局广播。io.emit:发送给所有连接,包括自己,适合系统消息。io.to(room).emit:房间级广播,结合socket.join(room)/socket.leave(room)管理订阅。socket.compress(false).emit等:对大流量广播可控制压缩、Acks 等参数以满足性能需求。
SSE(Server-Sent Events)
概念与特点
- 基于 HTTP/1.1 持久连接,由服务器单向推送文本事件到浏览器
EventSource。 - 事件格式是
text/event-stream,每个事件包含event、data、id等字段,以\n\n分隔。 - 浏览器原生支持自动重连、
Last-Event-ID断点续传,适合通知、行情、日志流式输出。 - SSE 是单向的(服务端 -> 客户端),若需要客户端上行消息仍要配合 POST/AJAX;而 WebSocket 是全双工。
Node 服务端示例
// sse-server.js
const express = require('express');
const app = express();
app.get('/events', (req, res) => {
res.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
res.flushHeaders();
let counter = 0;
const timer = setInterval(() => {
counter += 1;
res.write(`event: tick\n`);
res.write(`id: ${counter}\n`);
res.write(`data: ${JSON.stringify({ counter, ts: Date.now() })}\n\n`);
}, 2000);
req.on('close', () => {
clearInterval(timer);
res.end();
});
});
app.listen(4000, () => console.log('SSE server on 4000'));
浏览器客户端示例
<script>
const evtSource = new EventSource('http://localhost:4000/events');
evtSource.addEventListener('tick', (evt) => {
console.log('SSE tick payload:', JSON.parse(evt.data));
});
evtSource.onerror = (err) => {
console.error('SSE connection lost', err);
};
</script>
与 WebSocket 对比
- 通信方向:SSE 只有服务端 -> 客户端;WebSocket 支持双向。
- 协议/兼容:SSE 依赖 HTTP,受代理与防火墙更友好;WebSocket 需
Upgrade,对旧环境可能有限制。 - 吞吐 & 二进制:SSE 仅传文本,适合中低频推送;WebSocket 带二进制帧,对高频/大数据更合适。
- 心跳:SSE 内置重连机制;WebSocket 需应用层心跳(Socket.IO 已封装)。
在实际项目中,可按场景选择:例如后台配置、行情推送可用 SSE;需要实时协作、IM、控制面板等则优先 WebSocket/Socket.IO。
HTTP/2 与长连接
核心特性
- 继承 HTTP/1.1 的持久连接模型,通过 多路复用 在一条 TCP 长连接上并发多条 Stream,避免队头阻塞。
- 采用二进制帧层(Frame Layer),将请求/响应拆成 HEADERS、DATA 等帧以帧 ID 区分流。
- HPACK 头部压缩+静态表显著降低重复 header 的传输开销。
- 支持服务器主动推送(Server Push,现今浏览器已逐步弃用,但在特定客户端仍可用),也可通过长时间保持 DATA 帧实现流式返回。
Node.js http2 Demo
mkdir http2-demo && cd http2-demo
npm init -y
创建 server.mjs:
import fs from 'node:fs';
import http2 from 'node:http2';
// 浏览器访问需 https,可以用 mkcert/openssl 生成本地证书;示例使用自签证书
const server = http2.createSecureServer({
key: fs.readFileSync('./localhost-key.pem'),
cert: fs.readFileSync('./localhost-cert.pem'),
allowHTTP1: true, // 兼容老客户端
});
server.on('stream', (stream, headers) => {
const path = headers[':path'];
if (path === '/time') {
// 以 DATA 帧流式推送,实现长连接下的实时更新
const timer = setInterval(() => {
stream.write(JSON.stringify({ ts: Date.now() }) + '\n');
}, 2000);
stream.on('close', () => clearInterval(timer));
return;
}
stream.respond({
'content-type': 'application/json',
':status': 200,
});
stream.end(JSON.stringify({ hello: 'http2' }));
});
server.listen(8443, () => {
console.log('HTTP/2 server running https://localhost:8443');
});
客户端验证
- 浏览器:使用
fetch('https://localhost:8443/time')需信任自签证书,可在 DevTools 网络面板确认h2协议,响应会持续追加数据。 - CLI:
curl --http2 -k https://localhost:8443/time,-k跳过证书校验,可看到每 2 秒一行数据。
也可写一个 Node 客户端:
// client.mjs
import http2 from 'node:http2';
const client = http2.connect('https://localhost:8443', {
rejectUnauthorized: false,
});
const req = client.request({ ':path': '/time' });
req.on('data', (chunk) => {
process.stdout.write('tick ' + chunk.toString());
});
req.on('close', () => client.close());
req.end();
与 WebSocket/SSE 的取舍
- HTTP/2 长连接仍遵循请求-响应语义,客户端先发 HEADERS 后才能收到 DATA;WebSocket 连接一旦升级后就不再有请求概念。
- 对纯浏览器推送场景,若已升级 HTTP/2,可用长响应或 SSE 代替额外的 WS 链路,减少心跳管理;但 HTTP/2 仍不支持客户端主动上行而不发请求。
- 若需要双向通信或跨 TCP 针对房间广播,WebSocket 更直接;若是单向通知且基础设施已支持 HTTP/2,则利用其多路复用/压缩可以让接口更加高效。
nodejs 的事件模型
事件循环概念
- Node 架构:单线程 JS 执行线程 + 背景中的 libuv 线程池。所有 I/O 任务在内核或线程池中完成后,通过事件循环(Event Loop)调度回调进入 JS 主线程。
- 事件循环阶段(每一轮 tick 都按顺序执行):
- timers:到期的
setTimeout/setInterval; - pending callbacks:部分系统级回调,如 TCP errors;
- idle/prepare:内部使用;
- poll:获取新的 I/O 事件,并执行相关回调。若为空会视情况进入下一阶段或阻塞等待;
- check:专门执行
setImmediate; - close callbacks:例如
socket.on('close', ...)。 - 微任务:
process.nextTick在每个阶段结束都会优先执行,Promise microtask 紧随其后;因此合理使用nextTick/queueMicrotask可以调整回调执行顺序。
与浏览器事件循环的差异
- 阶段模型不同:浏览器遵循 HTML 标准的 Task/Microtask 队列(macro task 如
setTimeout、DOM 事件、fetch回调等;micro task 如 Promise、MutationObserver),每次执行完一个宏任务就清空所有微任务。Node 由 libuv 驱动,拥有 timers/poll/check 等 6 个阶段,每个阶段结束后才轮询 microtask,并在process.nextTick专队列后才执行 Promise 微任务。 - API 差异:浏览器没有
setImmediate、process.nextTick,而是以MessageChannel、requestAnimationFrame、postMessage等手段来影响调度;Node 的setImmediate绑定 check 阶段,nextTick用于打断当前阶段。相同的setTimeout(fn, 0)在浏览器最早也会在 4ms(>=5 次嵌套)后执行,而 Node 在 timers 阶段只要达到 0ms 就会调度。 - 渲染与 UI 约束:浏览器事件循环与渲染管线耦合——
requestAnimationFrame、layout/paint 会在宏任务之间执行,若 JS 长时间占用主线程会阻塞页面渲染。Node 不存在渲染阶段,CPU 密集逻辑只会阻塞 I/O 回调进入主线程。 - 多 event loop:浏览器中每个 window/frame/worker 拥有独立事件循环,彼此通过 postMessage 交互;Node 只有一个主事件循环,若需要并行可借助
worker_threads/cluster来创建额外线程/进程。 - 微任务优先级:浏览器 microtask(Promise 等)在宏任务完成后统一执行;Node 则是在
process.nextTick→ Promise microtask → 下一阶段的顺序下执行,因此过度使用nextTick可能让 I/O 饥饿,而浏览器则主要担心 microtask 长时间未清空导致渲染延迟。
事件循环示例
// event-loop.js
const fs = require('fs');
setTimeout(() => console.log('timeout'), 0);
setImmediate(() => console.log('immediate'));
process.nextTick(() => console.log('nextTick'));
fs.readFile(__filename, () => {
console.log('I/O callback');
setTimeout(() => console.log('timeout in I/O'), 0);
setImmediate(() => console.log('immediate in I/O'));
});
console.log('sync');
可能的输出(取决于 libuv poll 阶段状态):
sync
nextTick // 微任务最先执行
timeout / immediate (顺序非固定)
I/O callback
immediate in I/O // I/O 回调阶段后立即执行 check 阶段
timeout in I/O
通过这个脚本可以观察 timers/poll/check 在不同上下文下的调度次序。
EventEmitter 与自定义事件
Node 内部大量 API 基于 EventEmitter(http.Server、net.Socket 等)。我们也可以通过继承或直接实例化来发送/监听自定义事件。
// custom-event.js
const EventEmitter = require('events');
class TaskBus extends EventEmitter {
addTask(task) {
// 模拟异步执行完成后触发自定义事件
setTimeout(() => {
this.emit('task:done', {
id: task.id,
result: task.work.toUpperCase(),
});
}, 100);
}
}
const bus = new TaskBus();
bus.on('task:done', (payload) => {
console.log('Task finished:', payload);
});
bus.once('task:done', () => {
console.log('This runs only for the first completion');
});
bus.addTask({ id: 1, work: 'compile' });
bus.addTask({ id: 2, work: 'test' });
要点:
on/addListener:添加长期监听;once:一次性监听并在触发后自动移除。emit(event, ...args):同步触发监听器,按注册顺序执行。若希望异步触发可在 emit 前后使用setImmediate.removeListener/off与removeAllListeners用于释放资源,避免泄漏。
事件驱动模式有利于解耦生产者/消费者:I/O 完成、业务状态变化等都封装为事件。结合事件循环的调度,Node 既能保持单线程的编程模型,又能充分利用异步 I/O 的高并发能力。
node 底层异步 IO 与线程池
libuv 模型
- Node 的异步 IO 能力来自 C 层的 libuv:它向操作系统请求异步文件/TCP/DNS 等操作,并通过事件循环在完成后触发 JS 回调。
- 对于真正异步的内核调用(多数 socket、epoll/kqueue 支持的文件描述符),libuv 只需注册事件并在可读/可写时向上通知,不占用线程池。
- 对于无法提供非阻塞接口的操作(如部分文件系统调用、DNS 解析、压缩/加密等 CPU 密集任务),libuv 会将任务派发到一个固定大小的线程池中执行,完成后再把结果投递回事件循环。
线程池细节
- 默认大小为 4,可通过环境变量
UV_THREADPOOL_SIZE(最多 128)调整,例如UV_THREADPOOL_SIZE=8 node server.js。 - 使用线程池的 Node API 包括:
fs.readFile/writeFile、crypto.pbkdf2、zlib、dns.lookup(默认)、fs.stat等。 - 线程池中的任务一旦被占满,后续同类操作会排队等待;因此高并发文件 IO 场景建议将同步读写改为流式
fs.createReadStream/createWriteStream,或适当增大池规模。
示例:对比 IO 与 CPU 密集任务
// threadpool.js
const crypto = require('crypto');
const fs = require('fs');
console.time('fs');
for (let i = 0; i < 6; i++) {
fs.readFile(__filename, () => {
console.timeLog('fs', 'read file done', i);
});
}
console.time('pbkdf2');
for (let i = 0; i < 6; i++) {
crypto.pbkdf2('secret', 'salt', 1e6, 64, 'sha512', () => {
console.timeLog('pbkdf2', 'hash done', i);
});
}
运行后你会发现 pbkdf2 的回调最多同时处理 4 个任务,因为线程池默认仅提供 4 条工作线程;而 fs.readFile 也会占用同一个池。若对 CPU 密集任务并行度有更高需求,可以在启动时调大 UV_THREADPOOL_SIZE,但要注意机器核数和上下文切换成本。
与事件循环的协作
- 线程池执行完任务后,会将完成事件放入
pending callbacks阶段队列;事件循环会在下一轮处理它们,从而调用 JS 回调。 - 若 JS 层长时间处理 CPU 逻辑,事件循环无法尽快回到 poll 阶段,线程池排队的结果也无法被消费,这就是单线程阻塞问题。解决方案包括:拆分任务、使用
worker_threads、或迁移到专用进程。
实践建议
- 避免在请求线程中直接执行
crypto.pbkdf2、zlib.gzip等重 CPU 操作,可放到worker_threads或外部服务。 - 大量文件操作时使用流式 API,或调节
UV_THREADPOOL_SIZE并结合批量/限流控制。 - 对网络 IO,尽量使用真正异步的 socket/HTTP 客户端,借助 libuv epoll/kqueue 能力可以在单线程下同时管理数万连接。
总体来说,Node 的“异步 IO + 线程池”模式将 CPU 与 IO 工作拆分:IO 事件由内核通知、CPU 耗时由线程池承担,JS 主线程只负责驱动状态机和业务逻辑,从而达到高并发、高吞吐的效果。
Node 连接 MySQL / MongoDB / Redis
MySQL:事务型业务
适用于电商订单、库存等强一致场景。推荐 mysql2,支持 Promise 与连接池。
npm install mysql2
// mysql.js
const mysql = require('mysql2/promise');
const pool = mysql.createPool({
host: 'localhost',
user: 'app_user',
password: 'secret',
database: 'shop',
waitForConnections: true,
connectionLimit: 10,
});
async function createOrder(userId, items) {
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
const [orderResult] = await conn.execute(
'INSERT INTO orders(user_id, status) VALUES(?, ?)',
[userId, 'pending']
);
for (const item of items) {
await conn.execute(
'INSERT INTO order_items(order_id, sku, qty) VALUES(?,?,?)',
[orderResult.insertId, item.sku, item.qty]
);
}
await conn.commit();
return orderResult.insertId;
} catch (err) {
await conn.rollback();
throw err;
} finally {
conn.release();
}
}
实战提示:
- 所有参数使用
?占位 + 绑定数组,防 SQL 注入; - 把长连接集中托管在
pool中,设置waitForConnections避免连接数耗尽; - 对报表、只读接口可配置从库连接,实现主写从读;
- 结合
pool.on('connection', conn => conn.query('SET SESSION sql_mode=...'))做统一设置。
ORM 示例:Sequelize
在中大型项目中可使用 ORM 来抽象模型、关系与迁移。
npm install sequelize mysql2
// sequelize.js
const { Sequelize, DataTypes } = require('sequelize');
const sequelize = new Sequelize('shop', 'app_user', 'secret', {
host: 'localhost',
dialect: 'mysql',
logging: false,
pool: { max: 10, idle: 10000 },
});
const User = sequelize.define('User', {
email: { type: DataTypes.STRING, unique: true },
nickname: DataTypes.STRING,
});
const Order = sequelize.define('Order', {
status: {
type: DataTypes.ENUM('pending', 'paid', 'shipped'),
defaultValue: 'pending',
},
totalPrice: DataTypes.DECIMAL(10, 2),
});
User.hasMany(Order, { foreignKey: 'userId' });
Order.belongsTo(User, { foreignKey: 'userId' });
async function init() {
await sequelize.sync({ alter: true }); // 生产环境建议用 migration
const user = await User.create({ email: 'demo@test.com', nickname: 'Demo' });
await Order.create({ userId: user.id, totalPrice: 199.99 });
const orders = await Order.findAll({
include: [{ model: User, attributes: ['email'] }],
});
console.log(JSON.stringify(orders, null, 2));
}
init().catch(console.error);
Sequelize 要点:
- 支持模型钩子、验证、作用域、乐观锁 (
version) 等高级特性; - 生产环境使用
sequelize-cli维护 migration,避免sync({ alter: true })在多节点环境下带来不可控变更; - 当 SQL 需要优化时可使用
sequelize.query()下发原生语句,同时保留模型定义。
ORM 示例:Prisma
Prisma 使用 Schema 文件定义模型,并生成类型安全的客户端,适合 TypeScript 项目。
npm install prisma @prisma/client
npx prisma init
prisma/schema.prisma:
datasource db {
provider = "mysql"
url = env("DATABASE_URL")
}
generator client {
provider = "prisma-client-js"
}
model User {
id Int @id @default(autoincrement())
email String @unique
nickname String?
orders Order[]
}
model Order {
id Int @id @default(autoincrement())
status String @default("pending")
total Decimal
user User @relation(fields: [userId], references: [id])
userId Int
createdAt DateTime @default(now())
}
执行 npx prisma migrate dev --name init 生成表,然后:
// prisma-demo.ts
import { PrismaClient } from '@prisma/client';
const prisma = new PrismaClient();
async function main() {
const user = await prisma.user.create({
data: {
email: 'dev@test.com',
nickname: 'dev',
orders: {
create: [{ total: 299.9 }, { total: 399.9, status: 'paid' }],
},
},
include: { orders: true },
});
console.log(user);
}
main().finally(() => prisma.$disconnect());
Prisma 会根据 Schema 自动生成类型定义,支持中间件(如审计日志)、连接池复用、query event hook 等,能在保持 SQL 控制的同时保证开发效率。
MongoDB:文档型场景
内容管理、社交 Feed、日志或灵活 Schema 使用 MongoDB 更方便。可用官方驱动或 mongoose。
npm install mongoose
// mongo.js
const mongoose = require('mongoose');
await mongoose.connect('mongodb://localhost:27017/forum', {
maxPoolSize: 20,
});
const PostSchema = new mongoose.Schema({
title: String,
content: String,
tags: [String],
createdAt: { type: Date, default: Date.now },
});
PostSchema.index({ tags: 1, createdAt: -1 });
const Post = mongoose.model('Post', PostSchema);
async function listPosts(tag) {
return Post.find({ tags: tag }).sort({ createdAt: -1 }).limit(20).lean();
}
要点:
- 使用
maxPoolSize限制连接数量,搭配连接监控; - 查询链路最后调用
lean(),减少 Mongoose 的 getter/setter 开销; - 对高频字段建立复合索引(如 tag + createdAt),避免全表扫描;
- 可以利用副本集多节点读 (
readPreference=secondaryPreferred) 用于报表或实时推荐。
Redis:缓存 + 消息
Redis 擅长缓存、会话、计数器、Pub/Sub。ioredis 支持 Cluster/Sentinel、自动重连。
npm install ioredis
// redis.js
const Redis = require('ioredis');
const redis = new Redis({ host: '127.0.0.1', port: 6379 });
async function cacheProfile(userId, profile) {
await redis.set(`user:${userId}`, JSON.stringify(profile), 'EX', 3600);
}
async function getProfile(userId) {
const cached = await redis.get(`user:${userId}`);
return cached ? JSON.parse(cached) : null;
}
// 分布式锁:防止重复下单
async function acquireLock(key, ttl = 5000) {
const lockId = Date.now() + Math.random().toString(16).slice(2);
const ok = await redis.set(key, lockId, 'NX', 'PX', ttl);
return ok ? lockId : null;
}
// Pub/Sub:订单状态通知
const sub = new Redis();
sub.subscribe('order-status');
sub.on('message', (_, payload) => {
console.log('Order update:', payload);
});
const pub = new Redis();
pub.publish('order-status', JSON.stringify({ orderId: 1, status: 'shipped' }));
实践建议:
- 缓存必须设置 TTL,配合随机过期(
EX + Math.random())降低雪崩; - 需要持久化的数据配合 AOF/RDB 机制,或使用 Redis Cluster 提高可用性;
- Pub/Sub 只保证尽力而为,若需可靠消费可用 Redis Streams (
XADD/XREADGROUP); - 全局自增 ID 可用
INCR,排行榜使用ZADD/ZREVRANGE。
通过合理地将 MySQL 的强事务、MongoDB 的灵活文档与 Redis 的高速缓存结合,Node 应用可以 3 层分工:写入链路首先落盘 MySQL/MongoDB,读取优先命中 Redis,错失再回源;实时事件通过 Redis Pub/Sub 或 Stream 推送到 WebSocket/SSE,与上文实时通信部分协同,构建端到端的高并发服务。
Node 主进程、子进程与 Cluster
child_process 基础
Node 默认单线程执行 JS,如需利用多核或调用系统命令,可使用 child_process 模块创建子进程:
spawn(command, args, options):最常用,返回流式stdout/stderr,适合长期任务;exec(command, options, callback):将输出缓存在内存,适合短命令;fork(modulePath, args, options):专门用来运行 Node 子脚本,自动建立主子进程 IPC 通道。
// child.js
setInterval(() => {
process.send({ type: 'tick', ts: Date.now() });
}, 1000);
process.on('message', (msg) => {
if (msg === 'stop') process.exit(0);
});
// master.js
const { fork } = require('child_process');
const child = fork('./child.js');
child.on('message', (msg) => {
console.log('Child message:', msg);
if (msg.type === 'tick' && Math.random() > 0.8) {
child.send('stop');
}
});
child.on('exit', (code) => console.log('child exit', code));
fork 创建的管道是基于 IPC 的,process.send 和 child.on('message') 传递的对象会自动序列化。在 spawn/exec 场景也可以通过 child.stdin.write、child.stdout.on('data') 来实现流式通信。
共享端口与 Cluster
单个 Node 进程无法使用多核。cluster 模块封装了 child_process.fork,允许多个 Worker 共享同一个服务器端口;Master 负责监听端口并将连接分发给 Worker(在 Linux 上通过 SO_REUSEPORT 或内部 Round-Robin)。
// cluster-server.js
const cluster = require('cluster');
const http = require('http');
const os = require('os');
if (cluster.isMaster) {
const cpuCount = os.cpus().length;
for (let i = 0; i < cpuCount; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code) => {
console.log(`Worker ${worker.process.pid} died`, code);
cluster.fork(); // 自动重启
});
} else {
http
.createServer((req, res) => {
res.end(`Handled by worker ${process.pid}\n`);
})
.listen(3000);
}
主进程中可以监听 cluster.on('online')、cluster.on('message') 等事件,Worker 也可通过 process.send 与主进程交互,例如上报负载、健康状态。
Cluster 与子进程通信
- Master -> Worker:
worker.send(payload); - Worker -> Master:
process.send(payload);Master 监听cluster.on('message', (worker, msg) => {}); - Worker 间通信需要由 Master 做中转,或使用 Redis/Message Queue;
- 对于 CPU 密集型任务,
worker_threads是更轻量的选择,且共享内存;Cluster 侧重端口共享+进程隔离。
实践建议
- 子进程默认与主进程同生命周期,确保监听
exit/error以便重启或降级; - IPC 消息会序列化,避免传输巨大对象,可使用共享内存文件或 Socket;
- Cluster 模式下的 Worker 是独立进程,状态(缓存、连接)不共享,需在外部存储会话(如 Redis);
- 在容器环境中可考虑使用 PM2、forever 等进程管理工具封装 cluster,附带日志、重启策略和健康检查。
通过 child_process + cluster,Node 可以在保持单线程模型的前提下复用多核、隔离崩溃,并与系统命令或其他 Node 子脚本协作,构建更可靠的服务。
Express 与 Koa 项目搭建
Express:经典 MVC 框架
Express 是 Node 上使用最广泛的 Web 框架之一,提供路由、中间件机制,可以方便地组织 MVC/REST 项目。
初始化步骤
mkdir express-demo && cd express-demo
npm init -y
npm install express dotenv morgan
目录结构示例:
express-demo/
├─ app.js # 创建 express 实例
├─ routes/
│ ├─ index.js
│ └─ users.js
├─ controllers/
│ └─ user.controller.js
├─ models/
│ └─ user.model.js # 可使用 Sequelize/Prisma 等 ORM
└─ middlewares/
└─ auth.js
app.js:
const express = require('express');
const morgan = require('morgan');
require('dotenv').config();
const app = express();
app.use(express.json());
app.use(morgan('dev'));
const userRouter = require('./routes/users');
app.use('/api/users', userRouter);
app.use((err, req, res, next) => {
console.error(err);
res.status(err.status || 500).json({ message: err.message });
});
app.listen(process.env.PORT || 3000, () => {
console.log('Express server running');
});
routes/users.js:
const router = require('express').Router();
const userController = require('../controllers/user.controller');
const auth = require('../middlewares/auth');
router.get('/', auth.optional, userController.list);
router.post('/', auth.required, userController.create);
module.exports = router;
controllers/user.controller.js:
const User = require('../models/user.model');
exports.list = async (req, res, next) => {
try {
const users = await User.findAll();
res.json(users);
} catch (err) {
next(err);
}
};
Express 模型层通常由 ORM/ODM 提供(Sequelize/Prisma/Mongoose 等)。Express 引入中间件栈,因此可以轻松插入鉴权、日志、限流等逻辑,适合 REST API 和 SSR 应用。
Koa:更轻量的中间件组合
Koa 由 Express 团队打造,使用 async/await + 洋葱模型中间件,核心更轻,适合按需组合。
初始化步骤
mkdir koa-demo && cd koa-demo
npm init -y
npm install koa koa-router koa-body koa-logger dotenv
目录结构示例:
koa-demo/
├─ app.js
├─ routes/
│ └─ user.route.js
├─ controllers/
│ └─ user.controller.js
├─ services/
│ └─ user.service.js
└─ models/ # 可放 ORM 定义
app.js:
const Koa = require('koa');
const Router = require('koa-router');
const bodyParser = require('koa-body');
const logger = require('koa-logger');
require('dotenv').config();
const app = new Koa();
const router = new Router({ prefix: '/api' });
app.use(logger());
app.use(bodyParser());
const userRoute = require('./routes/user.route');
router.use('/users', userRoute.routes(), userRoute.allowedMethods());
app.use(router.routes()).use(router.allowedMethods());
app.on('error', (err, ctx) => {
console.error('server error', err, ctx);
});
app.listen(process.env.PORT || 4000, () => {
console.log('Koa server running');
});
routes/user.route.js:
const Router = require('koa-router');
const controller = require('../controllers/user.controller');
const authGuard = require('../middlewares/auth');
const router = new Router();
router.get('/', controller.list);
router.post('/', authGuard, controller.create);
module.exports = router;
Koa controller 示例:
const userService = require('../services/user.service');
exports.list = async (ctx, next) => {
const users = await userService.findAll();
ctx.body = users;
};
Koa 推荐的“模型”组织方式通常是 Service/Repository 层来封装业务逻辑,结合 async/await 洋葱模型,可以对请求前后做流式处理(如响应压缩、错误捕获)。配合 koa-compose、koa-jwt 等中间件,可快速搭建 GraphQL、REST、或 BFF 服务。
洋葱皮模型原理
Koa 的中间件本质是一个 async 函数组成的数组(middleware stack),利用 await next() 将执行权传给下个中间件,再在 next 返回后继续向外执行,形成“先递进、后回溯”的洋葱模型。koa-compose 的核心实现如下(简化版):
function compose(middlewares) {
return function (ctx) {
return dispatch(0);
function dispatch(i) {
const fn = middlewares[i];
if (!fn) return Promise.resolve();
return Promise.resolve(fn(ctx, () => dispatch(i + 1)));
}
};
}
示例:
app.use(async (ctx, next) => {
console.log('A before');
await next();
console.log('A after');
});
app.use(async (ctx, next) => {
console.log('B before');
await next();
console.log('B after');
});
app.use(async (ctx) => {
console.log('handler');
ctx.body = 'OK';
});
输出顺序:A before -> B before -> handler -> B after -> A after。因此,想要在请求完成后做收尾工作(日志、事务、响应包装),只需把逻辑写在 await next() 之后即可;想在进入下层之前预处理(鉴权、解析)则写在 await next() 之前。该模型通过 Promise/async 递归即可实现,无需复杂的状态机。
Express vs Koa
- 中间件机制:Express 基于 callback 栈;Koa 基于 async/await 洋葱模型,中间件可以
await next()后在返回时再处理。 - 生态:Express 中间件生态庞大;Koa 核心轻量,但需要手动选择中间件搭建功能。
- 默认功能:Express 自带大量便捷方法(
res.json、express.static);Koa 只提供核心,需要额外引入包。 - 错误处理:Koa 的 try/catch 搭配
app.on('error')能捕获 async 中的异常;Express 通过next(err)传递到错误处理中间件。
无论是 Express 还是 Koa,都可以将前述数据库层(MySQL/MongoDB/Redis)和实时能力(WebSocket/SSE)整合进来,按照路由 -> 控制器 -> 服务 -> 模型的分层设计,构建高可维护的 Node Web 项目。
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://joyjs.cn/archives/4767