在开发与第三方服务集成的应用程序或机器人时,通常需要将本地开发服务器暴露在 Internet 上以接收 Webhook 消息。要实现这一点,需要为本地服务器创建 HTTP 隧道。本文演示了如何使用 WebSocket 和 Node.js 流构建HTTP隧道工具并传输大数据。
为什么要部署自己的HTTP隧道服务
许多在线服务提供 HTTP 隧道,例如 ngrok,它提供付费的固定公共域来连接本地服务器。它还提供免费的套餐,但仅提供随机域,每次客户端重新启动时都会更改,使得在第三方服务中保存域名不方便。
要获得固定域,您可以在自己的服务器上部署HTTP隧道。ngrok还提供了开源版本用于服务器端部署,但它是一个旧版本1.x,存在一些严重的可靠性问题,不建议用于生产。
此外,使用自己的服务器,您可以确保数据安全。
Lite HTTP Tunnel项目简介
Lite HTTP Tunnel 是一项最近开发的HTTP隧道服务,可以自行托管。您可以使用 Github 存储库中的Deploy按钮部署它并免费获取固定域。
它基于 Express.js 和 Socket.io 构建,仅使用几行代码。它使用 WebSocket 将 HTTP / HTTPS 请求从公共服务器流式传输到本地服务器。
实现
步骤1:在服务器和客户端之间建立 WebSocket 连接
要在服务器端支持 WebSocket 连接,我们使用 socket.io:
const http = require('http');
const express = require('express');
const { Server } = require('socket.io');
const app = express();
const httpServer = http.createServer(app);
const io = new Server(httpServer);
let connectedSocket = null;
io.on('connection', (socket) => {
console.log('client connected');
connectedSocket = socket;
const onMessage = (message) => {
if (message === 'ping') {
socket.send('pong');
}
}
const onDisconnect = (reason) => {
console.log('client disconnected: ', reason);
connectedSocket = null;
socket.off('message', onMessage);
socket.off('error', onError);
};
const onError = (e) => {
connectedSocket = null;
socket.off('message', onMessage);
socket.off('disconnect', onDisconnect);
};
socket.on('message', onMessage);
socket.once('disconnect', onDisconnect);
socket.once('error', onError);
});
httpServer.listen(process.env.PORT);
要在客户端连接 WebSocket:
const { io } = require('socket.io-client');
let socket = null;
function initClient(options) {
socket = io(options.server, {
transports: ["websocket"],
auth: {
token: options.jwtToken,
},
});
socket.on('connect', () => {
if (socket.connected) {
console.log('client connect to server successfully');
}
});
socket.on('connect_error', (e) => {
console.log('connect error', e && e.message);
});
socket.on('disconnect', () => {
console.log('client disconnected');
});
}
第二步:使用 JWT Token 保护 WebSocket 连接
在服务器端,我们使用 socket.io
中间件拒绝无效连接:
const jwt = require('jsonwebtoken');
io.use((socket, next) => {
if (connectedSocket) {
return next(new Error('Connected error'));
}
if (!socket.handshake.auth || !socket.handshake.auth.token){
next(new Error('Authentication error'));
}
jwt.verify(socket.handshake.auth.token, process.env.SECRET_KEY, function(err, decoded) {
if (err) {
return next(new Error('Authentication error'));
}
if (decoded.token !== process.env.VERIFY_TOKEN) {
return next(new Error('Authentication error'));
}
next();
});
});
第 3 步:将请求从服务器传输到客户端
要将请求数据从服务器发送到客户端,我们使用可写流。以下代码实现了 SocketRequest
类,该类扩展了Node.js内置 stream
模块中的 Writable
类。
const { Writable } = require('stream');
class SocketRequest extends Writable {
constructor({ socket, requestId, request }) {
super();
this._socket = socket;
this._requestId = requestId;
this._socket.emit('request', requestId, request);
}
_write(chunk, encoding, callback) {
this._socket.emit('request-pipe', this._requestId, chunk);
this._socket.conn.once('drain', () => {
callback();
});
}
_writev(chunks, callback) {
this._socket.emit('request-pipes', this._requestId, chunks);
this._socket.conn.once('drain', () => {
callback();
});
}
_final(callback) {
this._socket.emit('request-pipe-end', this._requestId);
this._socket.conn.once('drain', () => {
callback();
});
}
_destroy(e, callback) {
if (e) {
this._socket.emit('request-pipe-error', this._requestId, e && e.message);
this._socket.conn.once('drain', () => {
callback();
});
return;
}
callback();
}
}
app.use('/', (req, res) => {
if (!connectedSocket) {
res.status(404);
res.send('Not Found');
return;
}
const requestId = uuidV4();
const socketRequest = new SocketRequest({
socket: connectedSocket,
requestId,
request: {
method: req.method,
headers: { ...req.headers },
path: req.url,
},
});
const onReqError = (e) => {
socketRequest.destroy(new Error(e || 'Aborted'));
}
req.once('aborted', onReqError);
req.once('error', onReqError);
req.pipe(socketRequest);
req.once('finish', () => {
req.off('aborted', onReqError);
req.off('error', onReqError);
});
// ...
});
要在客户端接收请求数据,我们使用可读流。以下代码实现了 SocketRequest
类,该类扩展了Node.js内置 stream
模块中的 Readable
类。
onst stream = require('stream');
class SocketRequest extends stream.Readable {
constructor({ socket, requestId }) {
super();
this._socket = socket;
this._requestId = requestId;
const onRequestPipe = (requestId, data) => {
if (this._requestId === requestId) {
this.push(data);
}
};
const onRequestPipes = (requestId, data) => {
if (this._requestId === requestId) {
data.forEach((chunk) => {
this.push(chunk);
});
}
};
const onRequestPipeError = (requestId, error) => {
if (this._requestId === requestId) {
this._socket.off('request-pipe', onRequestPipe);
this._socket.off('request-pipes', onRequestPipes);
this._socket.off('request-pipe-error', onRequestPipeError);
this._socket.off('request-pipe-end', onRequestPipeEnd);
this.destroy(new Error(error));
}
};
const onRequestPipeEnd = (requestId, data) => {
if (this._requestId === requestId) {
this._socket.off('request-pipe', onRequestPipe);
this._socket.off('request-pipes', onRequestPipes);
this._socket.off('request-pipe-error', onRequestPipeError);
this._socket.off('request-pipe-end', onRequestPipeEnd);
if (data) {
this.push(data);
}
this.push(null);
}
};
this._socket.on('request-pipe', onRequestPipe);
this._socket.on('request-pipes', onRequestPipes);
this._socket.on('request-pipe-error', onRequestPipeError);
this._socket.on('request-pipe-end', onRequestPipeEnd);
}
_read() {}
}
socket.on('request', (requestId, request) => {
console.log(`${request.method}: `, request.path);
request.port = options.port;
request.hostname = options.host;
const socketRequest = new SocketRequest({
requestId,
socket: socket,
});
const localReq = http.request(request);
socketRequest.pipe(localReq);
const onSocketRequestError = (e) => {
socketRequest.off('end', onSocketRequestEnd);
localReq.destroy(e);
};
const onSocketRequestEnd = () => {
socketRequest.off('error', onSocketRequestError);
};
socketRequest.once('error', onSocketRequestError);
socketRequest.once('end', onSocketRequestEnd);
// ...
});
第四步:从客户端传输响应到服务器
为将响应数据发送到隧道服务器,我们将使用流模块创建可写 stream
。
const stream = require('stream');
class SocketResponse extends stream.Writable {
constructor({ socket, responseId }) {
super();
this._socket = socket;
this._responseId = responseId;
}
_write(chunk, encoding, callback) {
this._socket.emit('response-pipe', this._responseId, chunk);
this._socket.io.engine.once('drain', () => {
callback();
});
}
_writev(chunks, callback) {
this._socket.emit('response-pipes', this._responseId, chunks);
this._socket.io.engine.once('drain', () => {
callback();
});
}
_final(callback) {
this._socket.emit('response-pipe-end', this._responseId);
this._socket.io.engine.once('drain', () => {
callback();
});
}
_destroy(e, callback) {
if (e) {
this._socket.emit('response-pipe-error', this._responseId, e && e.message);
this._socket.io.engine.once('drain', () => {
callback();
});
return;
}
callback();
}
writeHead(statusCode, statusMessage, headers) {
this._socket.emit('response', this._responseId, {
statusCode,
statusMessage,
headers,
});
}
}
socket.on('request', (requestId, request) => {
// ...stream request and send request to local server...
const onLocalResponse = (localRes) => {
localReq.off('error', onLocalError);
const socketResponse = new SocketResponse({
responseId: requestId,
socket: socket,
});
socketResponse.writeHead(
localRes.statusCode,
localRes.statusMessage,
localRes.headers
);
localRes.pipe(socketResponse);
};
const onLocalError = (error) => {
console.log(error);
localReq.off('response', onLocalResponse);
socket.emit('request-error', requestId, error && error.message);
socketRequest.destroy(error);
};
localReq.once('error', onLocalError);
localReq.once('response', onLocalResponse);
});
为了在隧道服务器中获取响应数据,我们将创建一个可读流。
class SocketResponse extends Readable {
constructor({ socket, responseId }) {
super();
this._socket = socket;
this._responseId = responseId;
const onResponse = (responseId, data) => {
if (this._responseId === responseId) {
this._socket.off('response', onResponse);
this._socket.off('request-error', onRequestError);
this.emit('response', data.statusCode, data.statusMessage, data.headers);
}
}
const onResponsePipe = (responseId, data) => {
if (this._responseId === responseId) {
this.push(data);
}
};
const onResponsePipes = (responseId, data) => {
if (this._responseId === responseId) {
data.forEach((chunk) => {
this.push(chunk);
});
}
};
const onResponsePipeError = (responseId, error) => {
if (this._responseId !== responseId) {
return;
}
this._socket.off('response-pipe', onResponsePipe);
this._socket.off('response-pipes', onResponsePipes);
this._socket.off('response-pipe-error', onResponsePipeError);
this._socket.off('response-pipe-end', onResponsePipeEnd);
this.destroy(new Error(error));
};
const onResponsePipeEnd = (responseId, data) => {
if (this._responseId !== responseId) {
return;
}
if (data) {
this.push(data);
}
this._socket.off('response-pipe', onResponsePipe);
this._socket.off('response-pipes', onResponsePipes);
this._socket.off('response-pipe-error', onResponsePipeError);
this._socket.off('response-pipe-end', onResponsePipeEnd);
this.push(null);
};
const onRequestError = (requestId, error) => {
if (requestId === this._responseId) {
this._socket.off('request-error', onRequestError);
this._socket.off('response', onResponse);
this._socket.off('response-pipe', onResponsePipe);
this._socket.off('response-pipes', onResponsePipes);
this._socket.off('response-pipe-error', onResponsePipeError);
this._socket.off('response-pipe-end', onResponsePipeEnd);
this.emit('requestError', error);
}
};
this._socket.on('response', onResponse);
this._socket.on('response-pipe', onResponsePipe);
this._socket.on('response-pipes', onResponsePipes);
this._socket.on('response-pipe-error', onResponsePipeError);
this._socket.on('response-pipe-end', onResponsePipeEnd);
this._socket.on('request-error', onRequestError);
}
_read(size) {}
}
app.use('/', (req, res) => {
// ... stream request to tunnel client
const onResponse = (statusCode, statusMessage, headers) => {
socketRequest.off('requestError', onRequestError)
res.writeHead(statusCode, statusMessage, headers);
};
socketResponse.once('requestError', onRequestError)
socketResponse.once('response', onResponse);
socketResponse.pipe(res);
const onSocketError = () => {
res.end(500);
};
socketResponse.once('error', onSocketError);
connectedSocket.once('close', onSocketError)
res.once('close', () => {
connectedSocket.off('close', onSocketError);
socketResponse.off('error', onSocketError);
});
});
在完成以上所有步骤后,我们已经支持将 HTTP 请求流式传输到本地计算机,并从本地服务器发送响应到原始请求。这是一种轻量级的解决方案,但稳定性高且易于部署在任何 “Node.js” 环境中。
第六步:部署HTTP隧道服务
我们可以将HTTP隧道服务部署到云提供商(如 Heroku)。项目 Lite HTTP Tunnel 在Github存储库中包含一个 Heroku/Render 按钮,可让您快速将服务部署到 Heroku/Render。
更多信息
在本文中,我们学习了如何基于 WebSocket 和 Node.js 流构建HTTP隧道工具。使用此工具,我们可以将本地开发服务器暴露到 Internet,并从第三方服务接收 Webhook 消息。我们还了解了如何使用 Node.js 流在客户端和服务器之间传输大量数据。
英文文章:https://dev.to/embbnux/building-a-http-tunnel-with-websocket-and-nodejs-4bp5