您好,登錄后才能下訂單哦!
本文介紹了Node.Js中實現端口重用原理詳解,分享給大家,具體如下:
起源,從官方實例中看多進程共用端口
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`worker ${worker.process.pid} died`); }); } else { http.createServer((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); console.log(`Worker ${process.pid} started`); }
執行結果:
$ node server.js
Master 3596 is running
Worker 4324 started
Worker 4520 started
Worker 6056 started
Worker 5644 started
了解http.js模塊:
我們都只有要創建一個http服務,必須引用http模塊,http模塊最終會調用net.js實現網絡服務
// lib/net.js 'use strict'; ... Server.prototype.listen = function(...args) { ... if (options instanceof TCP) { this._handle = options; this[async_id_symbol] = this._handle.getAsyncId(); listenInCluster(this, null, -1, -1, backlogFromArgs); // 注意這個方法調用了cluster模式下的處理辦法 return this; } ... }; function listenInCluster(server, address, port, addressType,backlog, fd, exclusive) { // 如果是master 進程或者沒有開啟cluster模式直接啟動listen if (cluster.isMaster || exclusive) { //_listen2,細心的人一定會發現為什么是listen2而不直接使用listen // _listen2 包裹了listen方法,如果是Worker進程,會調用被hack后的listen方法,從而避免出錯端口被占用的錯誤 server._listen2(address, port, addressType, backlog, fd); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 }; // 是fork 出來的進程,獲取master上的handel,并且監聽, // 現在是不是很好奇_getServer方法做了什么 cluster._getServer(server, serverQuery, listenOnMasterHandle); } ...
答案很快就可以通過cluster._getServer 這個函數找到
// lib/internal/cluster/child.js cluster._getServer = function(obj, options, cb) { // ... const message = util._extend({ act: 'queryServer', // 關鍵點:構建一個queryServer的消息 index: indexes[indexesKey], data: null }, options); message.address = address; // 發送queryServer消息給master進程,master 在收到這個消息后,會創建一個開始一個server,并且listen send(message, (reply, handle) => { rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = address && address.port || options.port; send(message); }); }; //... // Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; // 這里hack 了listen方法 // 子進程調用的listen方法,就是這個,直接返回0,所以不會報端口被占用的錯誤 function listen(backlog) { return 0; } // ... const handle = { close, listen, ref: noop, unref: noop }; handles[key] = handle; // 這個cb 函數是net.js 中的listenOnMasterHandle 方法 cb(0, handle); } // lib/net.js /* function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); server._handle = handle; // _listen2 函數中,調用的handle.listen方法,也就是上面被hack的listen server._listen2(address, port, addressType, backlog, fd); } */
master進程收到queryServer消息后進行啟動服務
// lib/internal/cluster/master.js function queryServer(worker, message) { const args = [ message.address, message.port, message.addressType, message.fd, message.index ]; const key = args.join(':'); var handle = handles[key]; // 如果地址沒被監聽過,通過RoundRobinHandle監聽開啟服務 if (handle === undefined) { var constructor = RoundRobinHandle; if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } handles[key] = handle = new constructor(key, address, message.port, message.addressType, message.fd, message.flags); } // 如果地址已經被監聽,直接綁定handel到已經監聽到服務上,去消費請求 // Set custom server data handle.add(worker, (errno, reply, handle) => { reply = util._extend({ errno: errno, key: key, ack: message.seq, data: handles[key].data }, reply); if (errno) delete handles[key]; // Gives other workers a chance to retry. send(worker, reply, handle); }); }
看到這一步,已經很明顯,我們知道了多進行端口共享的實現原理
那現在問題來了,既然Worker進程是如何獲取到master進程監聽服務接收到的connect呢?
// lib/internal/cluster/round_robin_handle.js function RoundRobinHandle(key, address, port, addressType, fd) { this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address); // UNIX socket path. this.server.once('listening', () => { this.handle = this.server._handle; // 監聽onconnection方法 this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } RoundRobinHandle.prototype.add = function (worker, send) { // ... }; RoundRobinHandle.prototype.remove = function (worker) { // ... }; RoundRobinHandle.prototype.distribute = function (err, handle) { // 負載均衡地挑選出一個worker this.handles.push(handle); const worker = this.free.shift(); if (worker) this.handoff(worker); }; RoundRobinHandle.prototype.handoff = function (worker) { const handle = this.handles.shift(); const message = { act: 'newconn', key: this.key }; // 向work進程其發送newconn內部消息和客戶端的句柄handle sendHelper(worker.process, message, handle, (reply) => { // ... this.handoff(worker); }); };
下面讓我們看看Worker進程接收到newconn消息后進行了哪些操作
// lib/child.js function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } // Round-robin connection. // 接收連接,并且處理 function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); }
總結
分享出于共享學習的目的,如有錯誤,歡迎大家留言指導,不喜勿噴。也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。