123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- ///
- /// Copyright © 2016-2023 The Thingsboard Authors
- ///
- /// Licensed under the Apache License, Version 2.0 (the "License");
- /// you may not use this file except in compliance with the License.
- /// You may obtain a copy of the License at
- ///
- /// http://www.apache.org/licenses/LICENSE-2.0
- ///
- /// Unless required by applicable law or agreed to in writing, software
- /// distributed under the License is distributed on an "AS IS" BASIS,
- /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- /// See the License for the specific language governing permissions and
- /// limitations under the License.
- ///
- import config from 'config';
- import { _logger } from '../config/logger';
- import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
- import { IQueue } from './queue.models';
- import {
- CreateQueueOptions,
- ProcessErrorArgs,
- ServiceBusAdministrationClient,
- ServiceBusClient,
- ServiceBusReceivedMessage,
- ServiceBusReceiver,
- ServiceBusSender
- } from '@azure/service-bus';
- export class ServiceBusTemplate implements IQueue {
- private logger = _logger(`serviceBusTemplate`);
- private requestTopic: string = config.get('request_topic');
- private namespaceName = config.get('service_bus.namespace_name');
- private sasKeyName = config.get('service_bus.sas_key_name');
- private sasKey = config.get('service_bus.sas_key');
- private queueProperties: string = config.get('service_bus.queue_properties');
- private sbClient: ServiceBusClient;
- private serviceBusService: ServiceBusAdministrationClient;
- private queueOptions: CreateQueueOptions = {};
- private queues: string[] = [];
- private receiver: ServiceBusReceiver;
- private senderMap = new Map<string, ServiceBusSender>();
- name = 'Azure Service Bus';
- constructor() {
- }
- async init() {
- const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`;
- this.sbClient = new ServiceBusClient(connectionString)
- this.serviceBusService = new ServiceBusAdministrationClient(connectionString);
- this.parseQueueProperties();
- const listQueues = await this.serviceBusService.listQueues();
- for await (const queue of listQueues) {
- this.queues.push(queue.name);
- }
- if (!this.queues.includes(this.requestTopic)) {
- await this.createQueueIfNotExist(this.requestTopic);
- this.queues.push(this.requestTopic);
- }
- this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'});
- const messageProcessor = new JsInvokeMessageProcessor(this);
- const messageHandler = async (message: ServiceBusReceivedMessage) => {
- if (message) {
- messageProcessor.onJsInvokeMessage(message.body);
- await this.receiver.completeMessage(message);
- }
- };
- const errorHandler = async (error: ProcessErrorArgs) => {
- this.logger.error('Failed to receive message from queue.', error);
- };
- this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
- }
- async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
- if (!this.queues.includes(this.requestTopic)) {
- await this.createQueueIfNotExist(this.requestTopic);
- this.queues.push(this.requestTopic);
- }
- let customSender = this.senderMap.get(responseTopic);
- if (!customSender) {
- customSender = this.sbClient.createSender(responseTopic);
- this.senderMap.set(responseTopic, customSender);
- }
- let data = {
- key: msgKey,
- data: [...rawResponse],
- headers: headers
- };
- return customSender.sendMessages({body: data});
- }
- private parseQueueProperties() {
- let properties: { [n: string]: string } = {};
- const props = this.queueProperties.split(';');
- props.forEach(p => {
- const delimiterPosition = p.indexOf(':');
- properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
- });
- this.queueOptions = {
- requiresDuplicateDetection: false,
- maxSizeInMegabytes: Number(properties['maxSizeInMb']),
- defaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`,
- lockDuration: `PT${properties['lockDurationInSec']}S`
- };
- }
- private async createQueueIfNotExist(topic: string) {
- try {
- await this.serviceBusService.createQueue(topic, this.queueOptions)
- } catch (err: any) {
- if (err && err.code !== "MessageEntityAlreadyExistsError") {
- throw new Error(err);
- }
- }
- }
- async destroy() {
- this.logger.info('Stopping Azure Service Bus resources...')
- if (this.receiver) {
- this.logger.info('Stopping Service Bus Receiver...');
- try {
- const _receiver = this.receiver;
- // @ts-ignore
- delete this.receiver;
- await _receiver.close();
- this.logger.info('Service Bus Receiver stopped.');
- } catch (e) {
- this.logger.info('Service Bus Receiver stop error.');
- }
- }
- this.logger.info('Stopping Service Bus Senders...');
- const senders: Promise<void>[] = [];
- this.senderMap.forEach((sender) => {
- senders.push(sender.close());
- });
- this.senderMap.clear();
- try {
- await Promise.all(senders);
- this.logger.info('Service Bus Senders stopped.');
- } catch (e) {
- this.logger.info('Service Bus Senders stop error.');
- }
- if (this.sbClient) {
- this.logger.info('Stopping Service Bus Client...');
- try {
- const _sbClient = this.sbClient;
- // @ts-ignore
- delete this.sbClient;
- await _sbClient.close();
- this.logger.info('Service Bus Client stopped.');
- } catch (e) {
- this.logger.info('Service Bus Client stop error.');
- }
- }
- this.logger.info('Azure Service Bus resources stopped.')
- }
- }
|