jsInvokeMessageProcessor.ts 18 KB


  1. ///
  2. /// Copyright © 2016-2023 The Thingsboard Authors
  3. ///
  4. /// Licensed under the Apache License, Version 2.0 (the "License");
  5. /// you may not use this file except in compliance with the License.
  6. /// You may obtain a copy of the License at
  7. ///
  8. /// http://www.apache.org/licenses/LICENSE-2.0
  9. ///
  10. /// Unless required by applicable law or agreed to in writing, software
  11. /// distributed under the License is distributed on an "AS IS" BASIS,
  12. /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. /// See the License for the specific language governing permissions and
  14. /// limitations under the License.
  15. ///
  16. import config from 'config';
  17. import { _logger } from '../config/logger';
  18. import { JsExecutor, TbScript } from './jsExecutor';
  19. import { performance } from 'perf_hooks';
  20. import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits, isNotUUID } from './utils';
  21. import { IQueue } from '../queue/queue.models';
  22. import {
  23. JsCompileRequest,
  24. JsCompileResponse,
  25. JsInvokeRequest,
  26. JsInvokeResponse,
  27. JsReleaseRequest,
  28. JsReleaseResponse,
  29. RemoteJsRequest,
  30. RemoteJsResponse,
  31. TbMessage
  32. } from './jsExecutor.models';
  33. import Long from 'long';
  34. const COMPILATION_ERROR = 0;
  35. const RUNTIME_ERROR = 1;
  36. const TIMEOUT_ERROR = 2;
  37. const NOT_FOUND_ERROR = 3;
  38. const statFrequency = Number(config.get('script.stat_print_frequency'));
  39. const memoryUsageTraceFrequency = Number(config.get('script.memory_usage_trace_frequency'));
  40. const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency'));
  41. const useSandbox = config.get('script.use_sandbox') === 'true';
  42. const maxActiveScripts = Number(config.get('script.max_active_scripts'));
  43. const slowQueryLogMs = Number(config.get('script.slow_query_log_ms'));
  44. const slowQueryLogBody = config.get('script.slow_query_log_body') === 'true';
  45. const maxResultSize = Number(config.get('js.max_result_size'));
  46. export class JsInvokeMessageProcessor {
  47. private logger = _logger(`JsInvokeMessageProcessor`);
  48. private producer: IQueue;
  49. private executor = new JsExecutor(useSandbox);
  50. private scriptMap = new Map<string, TbScript>();
  51. private scriptIds: string[] = [];
  52. private executedScriptIdsCounter: number[] = [];
  53. private executedScriptsCounter = 0;
  54. private lastStatTime = performance.now();
  55. private compilationTime = 0;
  56. constructor(produced: IQueue) {
  57. this.producer = produced;
  58. }
  59. onJsInvokeMessage(message: any) {
  60. const tStart = performance.now();
  61. let requestId = '';
  62. let responseTopic: string;
  63. let expireTs;
  64. let headers;
  65. let request: RemoteJsRequest = {};
  66. let buf: Buffer;
  67. try {
  68. request = JSON.parse(Buffer.from(message.data).toString('utf8'));
  69. headers = message.headers;
  70. buf = Buffer.from(headers.data['requestId']);
  71. requestId = UUIDFromBuffer(buf);
  72. buf = Buffer.from(headers.data['responseTopic']);
  73. responseTopic = buf.toString('utf8');
  74. buf = Buffer.from(headers.data['expireTs']);
  75. expireTs = Long.fromBytes(Array.from(buf), false, false).toNumber();
  76. const now = Date.now();
  77. if (expireTs && expireTs <= now) {
  78. if (this.logger.isDebugEnabled()) {
  79. this.logger.debug('Message expired! expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now)
  80. }
  81. return;
  82. }
  83. this.logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic);
  84. if (request.compileRequest) {
  85. this.processCompileRequest(requestId, responseTopic, headers, request.compileRequest);
  86. } else if (request.invokeRequest) {
  87. this.processInvokeRequest(requestId, responseTopic, headers, request.invokeRequest);
  88. } else if (request.releaseRequest) {
  89. this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest);
  90. } else {
  91. this.logger.error('[%s] Unknown request received!', requestId);
  92. }
  93. } catch (err: any) {
  94. this.logger.error('[%s] Failed to process request: %s', requestId, err.message);
  95. this.logger.error(err.stack);
  96. }
  97. const tFinish = performance.now();
  98. const tTook = tFinish - tStart;
  99. if (tTook > slowQueryLogMs) {
  100. let functionName;
  101. if (request.invokeRequest) {
  102. try {
  103. buf = Buffer.from(request.invokeRequest['functionName']);
  104. functionName = buf.toString('utf8');
  105. } catch (err: any) {
  106. this.logger.error('[%s] Failed to read functionName from message header: %s', requestId, err.message);
  107. this.logger.error(err.stack);
  108. }
  109. }
  110. this.logger.warn('[%s] SLOW PROCESSING [%s]ms, functionName [%s]', requestId, tTook, functionName);
  111. if (slowQueryLogBody) {
  112. this.logger.info('Slow request body: %s', JSON.stringify(request, null, 4))
  113. }
  114. }
  115. }
  116. processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) {
  117. const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest);
  118. this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
  119. if (this.scriptMap.has(scriptId)) {
  120. const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true);
  121. this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId);
  122. this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse);
  123. return;
  124. }
  125. this.executor.compileScript(compileRequest.scriptBody).then(
  126. (script) => {
  127. this.cacheScript(scriptId, script);
  128. const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true);
  129. this.logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId);
  130. this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse);
  131. },
  132. (err) => {
  133. const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, false, COMPILATION_ERROR, err);
  134. this.logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId);
  135. this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse);
  136. }
  137. );
  138. }
  139. processInvokeRequest(requestId: string, responseTopic: string, headers: any, invokeRequest: JsInvokeRequest) {
  140. const scriptId = JsInvokeMessageProcessor.getScriptId(invokeRequest);
  141. this.logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId);
  142. this.executedScriptsCounter++;
  143. if (this.executedScriptsCounter % statFrequency == 0) {
  144. const nowMs = performance.now();
  145. const msSinceLastStat = nowMs - this.lastStatTime;
  146. const requestsPerSec = msSinceLastStat == 0 ? statFrequency : statFrequency / msSinceLastStat * 1000;
  147. this.lastStatTime = nowMs;
  148. this.logger.info('STAT[%s]: requests [%s], took [%s]ms, request/s [%s], compilation [%s]ms', this.executedScriptsCounter, statFrequency, msSinceLastStat, requestsPerSec, this.compilationTime);
  149. this.compilationTime = 0;
  150. }
  151. if (this.executedScriptsCounter % scriptBodyTraceFrequency == 0) {
  152. this.logger.info('[%s] Executing script body: [%s]', scriptId, invokeRequest.scriptBody);
  153. }
  154. if (this.executedScriptsCounter % memoryUsageTraceFrequency == 0) {
  155. this.logger.info('Current memory usage: [%s]', process.memoryUsage());
  156. }
  157. this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then(
  158. (script) => {
  159. this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then(
  160. (result: string | undefined) => {
  161. if (!result || result.length <= maxResultSize) {
  162. const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse(result, true);
  163. this.logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId);
  164. this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
  165. } else {
  166. const err = {
  167. name: 'Error',
  168. message: 'script invocation result exceeds maximum allowed size of ' + maxResultSize + ' symbols'
  169. }
  170. const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, RUNTIME_ERROR, err);
  171. this.logger.debug('[%s] Script invocation result exceeds maximum allowed size of %s symbols, scriptId: [%s]', requestId, maxResultSize, scriptId);
  172. this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
  173. }
  174. },
  175. (err: any) => {
  176. let errorCode;
  177. if (err && isString(err.message) && err.message.includes('Script execution timed out')) {
  178. errorCode = TIMEOUT_ERROR;
  179. } else {
  180. errorCode = RUNTIME_ERROR;
  181. }
  182. const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, errorCode, err);
  183. this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
  184. this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
  185. }
  186. )
  187. },
  188. (err: any) => {
  189. let errorCode = COMPILATION_ERROR;
  190. if (err?.name === 'script body not found') {
  191. errorCode = NOT_FOUND_ERROR;
  192. }
  193. const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, errorCode, err);
  194. this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
  195. this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
  196. }
  197. );
  198. }
  199. processReleaseRequest(requestId: string, responseTopic: string, headers: any, releaseRequest: JsReleaseRequest) {
  200. const scriptId = JsInvokeMessageProcessor.getScriptId(releaseRequest);
  201. this.logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId);
  202. if (this.scriptMap.has(scriptId)) {
  203. const index = this.scriptIds.indexOf(scriptId);
  204. if (index > -1) {
  205. this.scriptIds.splice(index, 1);
  206. this.executedScriptIdsCounter.splice(index, 1);
  207. }
  208. this.scriptMap.delete(scriptId);
  209. }
  210. const releaseResponse = JsInvokeMessageProcessor.createReleaseResponse(scriptId, true);
  211. this.logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId);
  212. this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, undefined, releaseResponse);
  213. }
  214. sendResponse(requestId: string, responseTopic: string, headers: any, scriptId: string,
  215. compileResponse?: JsCompileResponse, invokeResponse?: JsInvokeResponse, releaseResponse?: JsReleaseResponse) {
  216. const tStartSending = performance.now();
  217. const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
  218. const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
  219. this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId);
  220. this.producer.send(responseTopic, requestId, rawResponse, headers).then(
  221. () => {
  222. this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId);
  223. },
  224. (err: any) => {
  225. if (err) {
  226. this.logger.error('[%s] Failed to send response to queue: %s', requestId, err.message);
  227. this.logger.error(err.stack);
  228. }
  229. }
  230. );
  231. }
  232. getOrCompileScript(scriptId: string, scriptBody: string): Promise<TbScript> {
  233. const self = this;
  234. return new Promise(function (resolve, reject) {
  235. const script = self.scriptMap.get(scriptId);
  236. if (script) {
  237. self.incrementUseScriptId(scriptId);
  238. resolve(script);
  239. } else if (scriptBody) {
  240. const startTime = performance.now();
  241. self.executor.compileScript(scriptBody).then(
  242. (compiledScript) => {
  243. self.compilationTime += (performance.now() - startTime);
  244. self.cacheScript(scriptId, compiledScript);
  245. resolve(compiledScript);
  246. },
  247. (err) => {
  248. self.compilationTime += (performance.now() - startTime);
  249. reject(err);
  250. }
  251. );
  252. } else {
  253. const err = {
  254. name: 'script body not found',
  255. message: ''
  256. }
  257. reject(err);
  258. }
  259. });
  260. }
  261. cacheScript(scriptId: string, script: TbScript) {
  262. if (!this.scriptMap.has(scriptId)) {
  263. this.scriptIds.push(scriptId);
  264. this.executedScriptIdsCounter.push(0);
  265. while (this.scriptIds.length > maxActiveScripts) {
  266. this.logger.info('Active scripts count [%s] exceeds maximum limit [%s]', this.scriptIds.length, maxActiveScripts);
  267. this.deleteMinUsedScript();
  268. }
  269. }
  270. this.scriptMap.set(scriptId, script);
  271. this.logger.info("scriptMap size is [%s]", this.scriptMap.size);
  272. }
  273. private static createRemoteResponse(requestId: string, compileResponse?: JsCompileResponse,
  274. invokeResponse?: JsInvokeResponse, releaseResponse?: JsReleaseResponse): RemoteJsResponse {
  275. const requestIdBits = UUIDToBits(requestId);
  276. return {
  277. requestIdMSB: requestIdBits[0],
  278. requestIdLSB: requestIdBits[1],
  279. compileResponse: compileResponse,
  280. invokeResponse: invokeResponse,
  281. releaseResponse: releaseResponse
  282. };
  283. }
  284. private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse {
  285. if (isNotUUID(scriptId)) {
  286. return {
  287. errorCode: errorCode,
  288. success: success,
  289. errorDetails: parseJsErrorDetails(err),
  290. scriptIdMSB: "0",
  291. scriptIdLSB: "0",
  292. scriptHash: scriptId
  293. };
  294. } else { // this is for backward compatibility (to be able to work with tb-node of previous version) - todo: remove in the next release
  295. let scriptIdBits = UUIDToBits(scriptId);
  296. return {
  297. errorCode: errorCode,
  298. success: success,
  299. errorDetails: parseJsErrorDetails(err),
  300. scriptIdMSB: scriptIdBits[0],
  301. scriptIdLSB: scriptIdBits[1],
  302. scriptHash: ""
  303. };
  304. }
  305. }
  306. private static createInvokeResponse(result: string | undefined, success: boolean, errorCode?: number, err?: any): JsInvokeResponse {
  307. return {
  308. errorCode: errorCode,
  309. success: success,
  310. errorDetails: parseJsErrorDetails(err),
  311. result: result
  312. };
  313. }
  314. private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse {
  315. if (isNotUUID(scriptId)) {
  316. return {
  317. success: success,
  318. scriptIdMSB: "0",
  319. scriptIdLSB: "0",
  320. scriptHash: scriptId,
  321. };
  322. } else { // todo: remove in the next release
  323. let scriptIdBits = UUIDToBits(scriptId);
  324. return {
  325. success: success,
  326. scriptIdMSB: scriptIdBits[0],
  327. scriptIdLSB: scriptIdBits[1],
  328. scriptHash: ""
  329. }
  330. }
  331. }
  332. private static getScriptId(request: TbMessage): string {
  333. return request.scriptHash ? request.scriptHash : toUUIDString(request.scriptIdMSB, request.scriptIdLSB);
  334. }
  335. private incrementUseScriptId(scriptId: string) {
  336. const index = this.scriptIds.indexOf(scriptId);
  337. if (this.executedScriptIdsCounter[index] < Number.MAX_SAFE_INTEGER) {
  338. this.executedScriptIdsCounter[index]++;
  339. }
  340. }
  341. private deleteMinUsedScript() {
  342. let min = Infinity;
  343. let minIndex = 0;
  344. const scriptIdsLength = this.executedScriptIdsCounter.length - 1; // ignored last added script
  345. for (let i = 0; i < scriptIdsLength; i++) {
  346. if (this.executedScriptIdsCounter[i] < min) {
  347. min = this.executedScriptIdsCounter[i];
  348. minIndex = i;
  349. }
  350. }
  351. const prevScriptId = this.scriptIds.splice(minIndex, 1)[0];
  352. this.executedScriptIdsCounter.splice(minIndex, 1)
  353. this.logger.info('Removing active script with id [%s]', prevScriptId);
  354. this.scriptMap.delete(prevScriptId);
  355. }
  356. }