kafkaTemplate.ts 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. ///
  2. /// Copyright © 2016-2023 The Thingsboard Authors
  3. ///
  4. /// Licensed under the Apache License, Version 2.0 (the "License");
  5. /// you may not use this file except in compliance with the License.
  6. /// You may obtain a copy of the License at
  7. ///
  8. /// http://www.apache.org/licenses/LICENSE-2.0
  9. ///
  10. /// Unless required by applicable law or agreed to in writing, software
  11. /// distributed under the License is distributed on an "AS IS" BASIS,
  12. /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. /// See the License for the specific language governing permissions and
  14. /// limitations under the License.
  15. ///
  16. import config from 'config';
  17. import { _logger, KafkaJsWinstonLogCreator } from '../config/logger';
  18. import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
  19. import { IQueue } from './queue.models';
  20. import {
  21. Admin,
  22. CompressionTypes,
  23. Consumer,
  24. Kafka,
  25. KafkaConfig,
  26. logLevel,
  27. Partitioners,
  28. Producer,
  29. TopicMessages
  30. } from 'kafkajs';
  31. import process, { kill, exit } from 'process';
  32. export class KafkaTemplate implements IQueue {
  33. private logger = _logger(`kafkaTemplate`);
  34. private replicationFactor = Number(config.get('kafka.replication_factor'));
  35. private topicProperties: string = config.get('kafka.topic_properties');
  36. private kafkaClientId: string = config.get('kafka.client_id');
  37. private acks = Number(config.get('kafka.acks'));
  38. private maxBatchSize = Number(config.get('kafka.batch_size'));
  39. private linger = Number(config.get('kafka.linger_ms'));
  40. private requestTimeout = Number(config.get('kafka.requestTimeout'));
  41. private connectionTimeout = Number(config.get('kafka.connectionTimeout'));
  42. private compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None;
  43. private partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently'));
  44. private kafkaClient: Kafka;
  45. private kafkaAdmin: Admin;
  46. private consumer: Consumer;
  47. private producer: Producer;
  48. private configEntries: any[] = [];
  49. private batchMessages: TopicMessages[] = [];
  50. private sendLoopInstance: NodeJS.Timeout;
  51. name = 'Kafka';
  52. constructor() {
  53. }
  54. async init(): Promise<void> {
  55. const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers');
  56. const requestTopic: string = config.get('request_topic');
  57. const useConfluent = config.get('kafka.use_confluent_cloud');
  58. this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
  59. this.logger.info('Kafka Requests Topic: %s', requestTopic);
  60. let kafkaConfig: KafkaConfig = {
  61. brokers: kafkaBootstrapServers.split(','),
  62. logLevel: logLevel.INFO,
  63. logCreator: KafkaJsWinstonLogCreator
  64. };
  65. if (this.kafkaClientId) {
  66. kafkaConfig['clientId'] = this.kafkaClientId;
  67. } else {
  68. this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID');
  69. }
  70. kafkaConfig['requestTimeout'] = this.requestTimeout;
  71. kafkaConfig['connectionTimeout'] = this.connectionTimeout;
  72. if (useConfluent) {
  73. kafkaConfig['sasl'] = {
  74. mechanism: config.get('kafka.confluent.sasl.mechanism') as any,
  75. username: config.get('kafka.confluent.username'),
  76. password: config.get('kafka.confluent.password')
  77. };
  78. kafkaConfig['ssl'] = true;
  79. }
  80. this.parseTopicProperties();
  81. this.kafkaClient = new Kafka(kafkaConfig);
  82. this.kafkaAdmin = this.kafkaClient.admin();
  83. await this.kafkaAdmin.connect();
  84. let partitions = 1;
  85. for (let i = 0; i < this.configEntries.length; i++) {
  86. let param = this.configEntries[i];
  87. if (param.name === 'partitions') {
  88. partitions = param.value;
  89. this.configEntries.splice(i, 1);
  90. break;
  91. }
  92. }
  93. let topics = await this.kafkaAdmin.listTopics();
  94. if (!topics.includes(requestTopic)) {
  95. let createRequestTopicResult = await this.createTopic(requestTopic, partitions);
  96. if (createRequestTopicResult) {
  97. this.logger.info('Created new topic: %s', requestTopic);
  98. }
  99. }
  100. this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'});
  101. this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner});
  102. const {CRASH} = this.consumer.events;
  103. this.consumer.on(CRASH, async (e) => {
  104. this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`);
  105. if (!e.payload.restart) {
  106. this.logger.error('Going to exit due to not retryable error!');
  107. kill(process.pid, 'SIGTERM'); //sending signal to myself process to trigger the handler
  108. await this.destroy();
  109. }
  110. });
  111. const messageProcessor = new JsInvokeMessageProcessor(this);
  112. await this.consumer.connect();
  113. await this.producer.connect();
  114. this.sendLoopWithLinger();
  115. await this.consumer.subscribe({topic: requestTopic});
  116. await this.consumer.run({
  117. partitionsConsumedConcurrently: this.partitionsConsumedConcurrently,
  118. eachMessage: async ({topic, partition, message}) => {
  119. let headers = message.headers;
  120. let key = message.key || new Buffer([]);
  121. let msg = {
  122. key: key.toString('utf8'),
  123. data: message.value,
  124. headers: {
  125. data: headers
  126. }
  127. };
  128. messageProcessor.onJsInvokeMessage(msg);
  129. },
  130. });
  131. }
  132. async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
  133. const message = {
  134. topic: responseTopic,
  135. messages: [{
  136. key: msgKey,
  137. value: rawResponse,
  138. headers: headers.data
  139. }]
  140. };
  141. await this.pushMessageToSendLater(message);
  142. }
  143. private async pushMessageToSendLater(message: TopicMessages) {
  144. this.batchMessages.push(message);
  145. if (this.batchMessages.length >= this.maxBatchSize) {
  146. await this.sendMessagesAsBatch(true);
  147. }
  148. }
  149. private async sendMessagesAsBatch(isImmediately = false): Promise<void> {
  150. if (this.sendLoopInstance) {
  151. clearTimeout(this.sendLoopInstance);
  152. }
  153. if (this.batchMessages.length > 0) {
  154. this.logger.debug('sendMessagesAsBatch, length: [%s], %s', this.batchMessages.length, isImmediately ? 'immediately' : '');
  155. const messagesToSend = this.batchMessages;
  156. this.batchMessages = [];
  157. try {
  158. await this.producer.sendBatch({
  159. topicMessages: messagesToSend,
  160. acks: this.acks,
  161. compression: this.compressionType
  162. })
  163. this.logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length);
  164. } catch (err: any) {
  165. this.logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length);
  166. this.logger.error(err.stack);
  167. this.batchMessages = messagesToSend.concat(this.batchMessages);
  168. }
  169. }
  170. this.sendLoopWithLinger();
  171. }
  172. private parseTopicProperties() {
  173. const props = this.topicProperties.split(';');
  174. props.forEach(p => {
  175. const delimiterPosition = p.indexOf(':');
  176. this.configEntries.push({
  177. name: p.substring(0, delimiterPosition),
  178. value: p.substring(delimiterPosition + 1)
  179. });
  180. });
  181. }
  182. private createTopic(topic: string, partitions: number): Promise<boolean> {
  183. return this.kafkaAdmin.createTopics({
  184. topics: [{
  185. topic: topic,
  186. numPartitions: partitions,
  187. replicationFactor: this.replicationFactor,
  188. configEntries: this.configEntries
  189. }]
  190. });
  191. }
  192. private sendLoopWithLinger() {
  193. if (this.sendLoopInstance) {
  194. clearTimeout(this.sendLoopInstance);
  195. // } else {
  196. // this.logger.debug("Starting new send loop with linger [%s]", this.linger)
  197. }
  198. this.sendLoopInstance = setTimeout(async () => {
  199. await this.sendMessagesAsBatch()
  200. }, this.linger);
  201. }
  202. async destroy(): Promise<void> {
  203. this.logger.info('Stopping Kafka resources...');
  204. if (this.kafkaAdmin) {
  205. this.logger.info('Stopping Kafka Admin...');
  206. const _kafkaAdmin = this.kafkaAdmin;
  207. // @ts-ignore
  208. delete this.kafkaAdmin;
  209. await _kafkaAdmin.disconnect();
  210. this.logger.info('Kafka Admin stopped.');
  211. }
  212. if (this.consumer) {
  213. this.logger.info('Stopping Kafka Consumer...');
  214. try {
  215. const _consumer = this.consumer;
  216. // @ts-ignore
  217. delete this.consumer;
  218. await _consumer.disconnect();
  219. this.logger.info('Kafka Consumer stopped.');
  220. await this.disconnectProducer();
  221. } catch (e: any) {
  222. this.logger.info('Kafka Consumer stop error.');
  223. await this.disconnectProducer();
  224. }
  225. }
  226. this.logger.info('Kafka resources stopped.');
  227. exit(0); //same as in version before
  228. }
  229. private async disconnectProducer(): Promise<void> {
  230. if (this.producer) {
  231. this.logger.info('Stopping Kafka Producer...');
  232. try {
  233. this.logger.info('Stopping loop...');
  234. clearTimeout(this.sendLoopInstance);
  235. await this.sendMessagesAsBatch();
  236. const _producer = this.producer;
  237. // @ts-ignore
  238. delete this.producer;
  239. await _producer.disconnect();
  240. this.logger.info('Kafka Producer stopped.');
  241. } catch (e) {
  242. this.logger.info('Kafka Producer stop error.');
  243. }
  244. }
  245. }
  246. }