pubSubTemplate.ts 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. ///
  2. /// Copyright © 2016-2023 The Thingsboard Authors
  3. ///
  4. /// Licensed under the Apache License, Version 2.0 (the "License");
  5. /// you may not use this file except in compliance with the License.
  6. /// You may obtain a copy of the License at
  7. ///
  8. /// http://www.apache.org/licenses/LICENSE-2.0
  9. ///
  10. /// Unless required by applicable law or agreed to in writing, software
  11. /// distributed under the License is distributed on an "AS IS" BASIS,
  12. /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. /// See the License for the specific language governing permissions and
  14. /// limitations under the License.
  15. ///
  16. import config from 'config';
  17. import { _logger } from '../config/logger';
  18. import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
  19. import { PubSub } from '@google-cloud/pubsub';
  20. import { IQueue } from './queue.models';
  21. import { Message } from '@google-cloud/pubsub/build/src/subscriber';
  22. export class PubSubTemplate implements IQueue {
  23. private logger = _logger(`pubSubTemplate`);
  24. private projectId: string = config.get('pubsub.project_id');
  25. private credentials = JSON.parse(config.get('pubsub.service_account'));
  26. private requestTopic: string = config.get('request_topic');
  27. private queueProperties: string = config.get('pubsub.queue_properties');
  28. private pubSubClient: PubSub;
  29. private queueProps: { [n: string]: string } = {};
  30. private topics: string[] = [];
  31. private subscriptions: string[] = [];
  32. name = 'Pub/Sub';
  33. constructor() {
  34. }
  35. async init() {
  36. this.pubSubClient = new PubSub({
  37. projectId: this.projectId,
  38. credentials: this.credentials
  39. });
  40. this.parseQueueProperties();
  41. const topicList = await this.pubSubClient.getTopics();
  42. if (topicList) {
  43. topicList[0].forEach(topic => {
  44. this.topics.push(PubSubTemplate.getName(topic.name));
  45. });
  46. }
  47. const subscriptionList = await this.pubSubClient.getSubscriptions();
  48. if (subscriptionList) {
  49. topicList[0].forEach(sub => {
  50. this.subscriptions.push(PubSubTemplate.getName(sub.name));
  51. });
  52. }
  53. if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) {
  54. await this.createTopic(this.requestTopic);
  55. await this.createSubscription(this.requestTopic);
  56. }
  57. const subscription = this.pubSubClient.subscription(this.requestTopic);
  58. const messageProcessor = new JsInvokeMessageProcessor(this);
  59. const messageHandler = (message: Message) => {
  60. messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8')));
  61. message.ack();
  62. };
  63. subscription.on('message', messageHandler);
  64. }
  65. async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
  66. if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) {
  67. await this.createTopic(this.requestTopic);
  68. await this.createSubscription(this.requestTopic);
  69. }
  70. let data = JSON.stringify(
  71. {
  72. key: msgKey,
  73. data: [...rawResponse],
  74. headers: headers
  75. });
  76. let dataBuffer = Buffer.from(data);
  77. return this.pubSubClient.topic(responseTopic).publishMessage({data: dataBuffer});
  78. }
  79. private parseQueueProperties() {
  80. const props = this.queueProperties.split(';');
  81. props.forEach(p => {
  82. const delimiterPosition = p.indexOf(':');
  83. this.queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
  84. });
  85. }
  86. private static getName(fullName: string): string {
  87. const delimiterPosition = fullName.lastIndexOf('/');
  88. return fullName.substring(delimiterPosition + 1);
  89. }
  90. private async createTopic(topic: string) {
  91. if (!this.topics.includes(topic)) {
  92. try {
  93. await this.pubSubClient.createTopic(topic);
  94. this.logger.info('Created new Pub/Sub topic: %s', topic);
  95. } catch (e) {
  96. this.logger.info('Pub/Sub topic already exists');
  97. }
  98. this.topics.push(topic);
  99. }
  100. }
  101. private async createSubscription(topic: string) {
  102. if (!this.subscriptions.includes(topic)) {
  103. try {
  104. await this.pubSubClient.createSubscription(topic, topic, {
  105. topic: topic,
  106. name: topic,
  107. ackDeadlineSeconds: Number(this.queueProps['ackDeadlineInSec']),
  108. messageRetentionDuration: {
  109. seconds: this.queueProps['messageRetentionInSec']
  110. }
  111. });
  112. this.logger.info('Created new Pub/Sub subscription: %s', topic);
  113. } catch (e) {
  114. this.logger.info('Pub/Sub subscription already exists.');
  115. }
  116. this.subscriptions.push(topic);
  117. }
  118. }
  119. async destroy(): Promise<void> {
  120. this.logger.info('Stopping Pub/Sub resources...');
  121. if (this.pubSubClient) {
  122. this.logger.info('Stopping Pub/Sub client...');
  123. try {
  124. const _pubSubClient = this.pubSubClient;
  125. // @ts-ignore
  126. delete this.pubSubClient;
  127. await _pubSubClient.close();
  128. this.logger.info('Pub/Sub client stopped.');
  129. } catch (e) {
  130. this.logger.info('Pub/Sub client stop error.');
  131. }
  132. }
  133. this.logger.info('Pub/Sub resources stopped.');
  134. }
  135. }