如今的服务器、PC,甚至是平板电脑和手机,几乎没有单核的CPU。
单线程的node.js代码意味着只能同时使用一个CPU核心,那么node.js如何发挥多核CPU的性能呢?
我想多进程至少是其中一个方案。
child_process是Node.js提供的子进程模块。
child_process模块提供了7个创建子进程的方法,四个异步方法和三个同步方法,它们分别是child_process.exec()、child_process.execFile()、 child_process.fork()、child_process.spawn()和child_process.execSync()、 child_process.execFileSync()、child_process.spawnSync()。
我们首先来看一下这7个方法的源码:
child_process.exec():
exports.exec = function(command /*, options, callback*/) {
var opts = normalizeExecArgs.apply(null, arguments);
return exports.execFile(opts.file,
opts.args,
opts.options,
opts.callback);
};
child_process.execFile():
exports.execFile = function(file /*, args, options, callback*/) {
var args = [], callback;
var options = {
encoding: 'utf8',
timeout: 0,
maxBuffer: 200 * 1024,
killSignal: 'SIGTERM',
cwd: null,
env: null
};
// Parse the optional positional parameters.
var pos = 1;
if (pos < arguments.length && Array.isArray(arguments[pos])) {
args = arguments[pos++];
} else if (pos < arguments.length && arguments[pos] == null) {
pos++;
}
if (pos < arguments.length && typeof arguments[pos] === 'object') {
options = util._extend(options, arguments[pos++]);
} else if (pos < arguments.length && arguments[pos] == null) {
pos++;
}
if (pos < arguments.length && typeof arguments[pos] === 'function') {
callback = arguments[pos++];
}
if (pos === 1 && arguments.length > 1) {
throw new TypeError('Incorrect value of args option');
}
var child = spawn(file, args, {
cwd: options.cwd,
env: options.env,
gid: options.gid,
uid: options.uid,
windowsVerbatimArguments: !!options.windowsVerbatimArguments
});
var encoding;
var _stdout;
var _stderr;
if (options.encoding !== 'buffer' && Buffer.isEncoding(options.encoding)) {
encoding = options.encoding;
_stdout = '';
_stderr = '';
} else {
_stdout = [];
_stderr = [];
encoding = null;
}
var stdoutLen = 0;
var stderrLen = 0;
var killed = false;
var exited = false;
var timeoutId;
var ex = null;
function exithandler(code, signal) {
if (exited) return;
exited = true;
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
if (!callback) return;
// merge chunks
var stdout;
var stderr;
if (!encoding) {
stdout = Buffer.concat(_stdout);
stderr = Buffer.concat(_stderr);
} else {
stdout = _stdout;
stderr = _stderr;
}
if (ex) {
// Will be handled later
} else if (code === 0 && signal === null) {
callback(null, stdout, stderr);
return;
}
var cmd = file;
if (args.length !== 0)
cmd += ' ' + args.join(' ');
if (!ex) {
ex = new Error('Command failed: ' + cmd + '\n' + stderr);
ex.killed = child.killed || killed;
ex.code = code < 0 ? uv.errname(code) : code;
ex.signal = signal;
}
ex.cmd = cmd;
callback(ex, stdout, stderr);
}
function errorhandler(e) {
ex = e;
if (child.stdout)
child.stdout.destroy();
if (child.stderr)
child.stderr.destroy();
exithandler();
}
function kill() {
if (child.stdout)
child.stdout.destroy();
if (child.stderr)
child.stderr.destroy();
killed = true;
try {
child.kill(options.killSignal);
} catch (e) {
ex = e;
exithandler();
}
}
if (options.timeout > 0) {
timeoutId = setTimeout(function() {
kill();
timeoutId = null;
}, options.timeout);
}
if (child.stdout) {
if (encoding)
child.stdout.setEncoding(encoding);
child.stdout.addListener('data', function(chunk) {
stdoutLen += chunk.length;
if (stdoutLen > options.maxBuffer) {
ex = new Error('stdout maxBuffer exceeded');
kill();
} else {
if (!encoding)
_stdout.push(chunk);
else
_stdout += chunk;
}
});
}
if (child.stderr) {
if (encoding)
child.stderr.setEncoding(encoding);
child.stderr.addListener('data', function(chunk) {
stderrLen += chunk.length;
if (stderrLen > options.maxBuffer) {
ex = new Error('stderr maxBuffer exceeded');
kill();
} else {
if (!encoding)
_stderr.push(chunk);
else
_stderr += chunk;
}
});
}
child.addListener('close', exithandler);
child.addListener('error', errorhandler);
return child;
};
child_process.fork():
exports.fork = function(modulePath /*, args, options*/) {
// Get options and args arguments.
var options, args, execArgv;
if (Array.isArray(arguments[1])) {
args = arguments[1];
options = util._extend({}, arguments[2]);
} else if (arguments[1] && typeof arguments[1] !== 'object') {
throw new TypeError('Incorrect value of args option');
} else {
args = [];
options = util._extend({}, arguments[1]);
}
// Prepare arguments for fork:
execArgv = options.execArgv || process.execArgv;
if (execArgv === process.execArgv && process._eval != null) {
const index = execArgv.lastIndexOf(process._eval);
if (index > 0) {
// Remove the -e switch to avoid fork bombing ourselves.
execArgv = execArgv.slice();
execArgv.splice(index - 1, 2);
}
}
args = execArgv.concat([modulePath], args);
// Leave stdin open for the IPC channel. stdout and stderr should be the
// same as the parent's if silent isn't set.
options.stdio = options.silent ? ['pipe', 'pipe', 'pipe', 'ipc'] :
[0, 1, 2, 'ipc'];
options.execPath = options.execPath || process.execPath;
return spawn(options.execPath, args, options);
};