123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- ///
- /// Copyright © 2016-2023 The Thingsboard Authors
- ///
- /// Licensed under the Apache License, Version 2.0 (the "License");
- /// you may not use this file except in compliance with the License.
- /// You may obtain a copy of the License at
- ///
- /// http://www.apache.org/licenses/LICENSE-2.0
- ///
- /// Unless required by applicable law or agreed to in writing, software
- /// distributed under the License is distributed on an "AS IS" BASIS,
- /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- /// See the License for the specific language governing permissions and
- /// limitations under the License.
- ///
- import config from 'config';
- import { _logger } from '../config/logger';
- import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
- import { PubSub } from '@google-cloud/pubsub';
- import { IQueue } from './queue.models';
- import { Message } from '@google-cloud/pubsub/build/src/subscriber';
- export class PubSubTemplate implements IQueue {
- private logger = _logger(`pubSubTemplate`);
- private projectId: string = config.get('pubsub.project_id');
- private credentials = JSON.parse(config.get('pubsub.service_account'));
- private requestTopic: string = config.get('request_topic');
- private queueProperties: string = config.get('pubsub.queue_properties');
- private pubSubClient: PubSub;
- private queueProps: { [n: string]: string } = {};
- private topics: string[] = [];
- private subscriptions: string[] = [];
- name = 'Pub/Sub';
- constructor() {
- }
- async init() {
- this.pubSubClient = new PubSub({
- projectId: this.projectId,
- credentials: this.credentials
- });
- this.parseQueueProperties();
- const topicList = await this.pubSubClient.getTopics();
- if (topicList) {
- topicList[0].forEach(topic => {
- this.topics.push(PubSubTemplate.getName(topic.name));
- });
- }
- const subscriptionList = await this.pubSubClient.getSubscriptions();
- if (subscriptionList) {
- topicList[0].forEach(sub => {
- this.subscriptions.push(PubSubTemplate.getName(sub.name));
- });
- }
- if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) {
- await this.createTopic(this.requestTopic);
- await this.createSubscription(this.requestTopic);
- }
- const subscription = this.pubSubClient.subscription(this.requestTopic);
- const messageProcessor = new JsInvokeMessageProcessor(this);
- const messageHandler = (message: Message) => {
- messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8')));
- message.ack();
- };
- subscription.on('message', messageHandler);
- }
- async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
- if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) {
- await this.createTopic(this.requestTopic);
- await this.createSubscription(this.requestTopic);
- }
- let data = JSON.stringify(
- {
- key: msgKey,
- data: [...rawResponse],
- headers: headers
- });
- let dataBuffer = Buffer.from(data);
- return this.pubSubClient.topic(responseTopic).publishMessage({data: dataBuffer});
- }
- private parseQueueProperties() {
- const props = this.queueProperties.split(';');
- props.forEach(p => {
- const delimiterPosition = p.indexOf(':');
- this.queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
- });
- }
- private static getName(fullName: string): string {
- const delimiterPosition = fullName.lastIndexOf('/');
- return fullName.substring(delimiterPosition + 1);
- }
- private async createTopic(topic: string) {
- if (!this.topics.includes(topic)) {
- try {
- await this.pubSubClient.createTopic(topic);
- this.logger.info('Created new Pub/Sub topic: %s', topic);
- } catch (e) {
- this.logger.info('Pub/Sub topic already exists');
- }
- this.topics.push(topic);
- }
- }
- private async createSubscription(topic: string) {
- if (!this.subscriptions.includes(topic)) {
- try {
- await this.pubSubClient.createSubscription(topic, topic, {
- topic: topic,
- name: topic,
- ackDeadlineSeconds: Number(this.queueProps['ackDeadlineInSec']),
- messageRetentionDuration: {
- seconds: this.queueProps['messageRetentionInSec']
- }
- });
- this.logger.info('Created new Pub/Sub subscription: %s', topic);
- } catch (e) {
- this.logger.info('Pub/Sub subscription already exists.');
- }
- this.subscriptions.push(topic);
- }
- }
- async destroy(): Promise<void> {
- this.logger.info('Stopping Pub/Sub resources...');
- if (this.pubSubClient) {
- this.logger.info('Stopping Pub/Sub client...');
- try {
- const _pubSubClient = this.pubSubClient;
- // @ts-ignore
- delete this.pubSubClient;
- await _pubSubClient.close();
- this.logger.info('Pub/Sub client stopped.');
- } catch (e) {
- this.logger.info('Pub/Sub client stop error.');
- }
- }
- this.logger.info('Pub/Sub resources stopped.');
- }
- }
|