rabbitmqTemplate.ts 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. ///
  2. /// Copyright © 2016-2023 The Thingsboard Authors
  3. ///
  4. /// Licensed under the Apache License, Version 2.0 (the "License");
  5. /// you may not use this file except in compliance with the License.
  6. /// You may obtain a copy of the License at
  7. ///
  8. /// http://www.apache.org/licenses/LICENSE-2.0
  9. ///
  10. /// Unless required by applicable law or agreed to in writing, software
  11. /// distributed under the License is distributed on an "AS IS" BASIS,
  12. /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. /// See the License for the specific language governing permissions and
  14. /// limitations under the License.
  15. ///
  16. import config from 'config';
  17. import { _logger } from '../config/logger';
  18. import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
  19. import { IQueue } from './queue.models';
  20. import amqp, { ConfirmChannel, Connection } from 'amqplib';
  21. import { Options, Replies } from 'amqplib/properties';
  22. export class RabbitMqTemplate implements IQueue {
  23. private logger = _logger(`rabbitmqTemplate`);
  24. private requestTopic: string = config.get('request_topic');
  25. private host = config.get('rabbitmq.host');
  26. private port = config.get('rabbitmq.port');
  27. private vhost = config.get('rabbitmq.virtual_host');
  28. private username = config.get('rabbitmq.username');
  29. private password = config.get('rabbitmq.password');
  30. private queueProperties: string = config.get('rabbitmq.queue_properties');
  31. private queueOptions: Options.AssertQueue = {
  32. durable: false,
  33. exclusive: false,
  34. autoDelete: false
  35. };
  36. private connection: Connection;
  37. private channel: ConfirmChannel;
  38. private topics: string[] = [];
  39. name = 'RabbitMQ';
  40. constructor() {
  41. }
  42. async init(): Promise<void> {
  43. const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`;
  44. this.connection = await amqp.connect(url);
  45. this.channel = await this.connection.createConfirmChannel();
  46. this.parseQueueProperties();
  47. await this.createQueue(this.requestTopic);
  48. const messageProcessor = new JsInvokeMessageProcessor(this);
  49. await this.channel.consume(this.requestTopic, (message) => {
  50. if (message) {
  51. messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
  52. this.channel.ack(message);
  53. }
  54. })
  55. }
  56. async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
  57. if (!this.topics.includes(responseTopic)) {
  58. await this.createQueue(responseTopic);
  59. this.topics.push(responseTopic);
  60. }
  61. let data = JSON.stringify(
  62. {
  63. key: msgKey,
  64. data: [...rawResponse],
  65. headers: headers
  66. });
  67. let dataBuffer = Buffer.from(data);
  68. this.channel.sendToQueue(responseTopic, dataBuffer);
  69. return this.channel.waitForConfirms()
  70. }
  71. private parseQueueProperties() {
  72. let args: { [n: string]: number } = {};
  73. const props = this.queueProperties.split(';');
  74. props.forEach(p => {
  75. const delimiterPosition = p.indexOf(':');
  76. args[p.substring(0, delimiterPosition)] = Number(p.substring(delimiterPosition + 1));
  77. });
  78. this.queueOptions['arguments'] = args;
  79. }
  80. private async createQueue(topic: string): Promise<Replies.AssertQueue> {
  81. return this.channel.assertQueue(topic, this.queueOptions);
  82. }
  83. async destroy() {
  84. this.logger.info('Stopping RabbitMQ resources...');
  85. if (this.channel) {
  86. this.logger.info('Stopping RabbitMQ chanel...');
  87. const _channel = this.channel;
  88. // @ts-ignore
  89. delete this.channel;
  90. await _channel.close();
  91. this.logger.info('RabbitMQ chanel stopped');
  92. }
  93. if (this.connection) {
  94. this.logger.info('Stopping RabbitMQ connection...')
  95. try {
  96. const _connection = this.connection;
  97. // @ts-ignore
  98. delete this.connection;
  99. await _connection.close();
  100. this.logger.info('RabbitMQ client connection.');
  101. } catch (e) {
  102. this.logger.info('RabbitMQ connection stop error.');
  103. }
  104. }
  105. this.logger.info('RabbitMQ resources stopped.')
  106. }
  107. }