433 lines
16 KiB
C#
433 lines
16 KiB
C#
using System;
|
|
using System.Text;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using MQTTnet;
|
|
using Newtonsoft.Json;
|
|
using RuleEngine.Constants;
|
|
using RuleEngine.Database;
|
|
using RuleEngine.DTOs;
|
|
using RuleEngine.Interfaces;
|
|
using Serilog;
|
|
|
|
namespace RuleEngine
|
|
{
|
|
public class RuleEngineWorker : BackgroundService
|
|
{
|
|
// Configuration and dependency injection
|
|
private readonly ILogger<RuleEngineWorker> _logger;
|
|
private readonly IConfiguration _config;
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly IDbContextFactory<DatabaseContext> _dbContextFactory;
|
|
|
|
// MQTT client
|
|
private IMqttClient? _mqttClient;
|
|
private MqttClientOptions _mqttOptions;
|
|
private readonly SemaphoreSlim _mqttSemaphore = new SemaphoreSlim(1, 1);
|
|
|
|
// MQTT settings
|
|
private readonly string _mqttBroker;
|
|
private readonly int _mqttPort;
|
|
private readonly string _mqttUsername;
|
|
private readonly string _mqttPassword;
|
|
private readonly string _jwtSecret;
|
|
|
|
// Reconnection timer
|
|
private System.Timers.Timer _reconnectTimer = null!;
|
|
private bool _isDisposed = false;
|
|
|
|
public RuleEngineWorker(
|
|
ILogger<RuleEngineWorker> logger,
|
|
IServiceProvider serviceProvider,
|
|
IConfiguration configuration,
|
|
IDbContextFactory<DatabaseContext> dbContextFactory)
|
|
{
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
_config = configuration ?? throw new ArgumentNullException(nameof(configuration));
|
|
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
|
|
_dbContextFactory = dbContextFactory ?? throw new ArgumentNullException(nameof(dbContextFactory));
|
|
|
|
// Load MQTT configuration
|
|
_mqttBroker = _config.GetSection("MqttSettings:Broker").Get<string>() ?? "localhost";
|
|
_mqttPort = _config.GetSection("MqttSettings:Port").Get<int?>() ?? 1883;
|
|
_jwtSecret = _config.GetSection("MqttSettings:JwtSecret").Get<string>() ?? "emqx@sivan";
|
|
_mqttUsername = _config.GetSection("MqttSettings:Username").Get<string>() ?? string.Empty;
|
|
_mqttPassword = _config.GetSection("MqttSettings:Password").Get<string>() ?? string.Empty;
|
|
|
|
// Create MQTT options
|
|
_mqttOptions = CreateMqttClientOptions();
|
|
|
|
_logger.LogInformation("RuleEngineWorker initialized with broker: {Broker}:{Port}", _mqttBroker, _mqttPort);
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("RuleEngineWorker starting at: {Time}", DateTimeOffset.Now);
|
|
|
|
try
|
|
{
|
|
await InitializeMqttClient();
|
|
InitializeReconnectTimer();
|
|
|
|
// Keep the service running
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
if (_logger.IsEnabled(LogLevel.Debug))
|
|
{
|
|
_logger.LogDebug("RuleEngineWorker running at: {Time}", DateTimeOffset.Now);
|
|
}
|
|
await Task.Delay(30000, stoppingToken); // Heartbeat every 30 seconds
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.LogInformation("RuleEngineWorker stopping due to cancellation");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Unhandled exception in RuleEngineWorker");
|
|
}
|
|
finally
|
|
{
|
|
await DisposeMqttClient();
|
|
_logger.LogInformation("RuleEngineWorker stopped at: {Time}", DateTimeOffset.Now);
|
|
}
|
|
}
|
|
|
|
private MqttClientOptions CreateMqttClientOptions()
|
|
{
|
|
return new MqttClientOptionsBuilder()
|
|
.WithTcpServer("localhost", 1883)
|
|
.WithClientId($"RuleEngineWorker-{Guid.NewGuid()}")
|
|
.WithCredentials(_mqttUsername, _mqttPassword)
|
|
.WithCleanSession(false)
|
|
.WithSessionExpiryInterval(604800) // 7 days in seconds
|
|
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
|
|
.WithTimeout(TimeSpan.FromSeconds(15))
|
|
.Build();
|
|
}
|
|
|
|
private void InitializeReconnectTimer()
|
|
{
|
|
_reconnectTimer = new System.Timers.Timer(10000); // 10 seconds
|
|
_reconnectTimer.Elapsed += async (sender, e) => await TryReconnectMqtt();
|
|
_reconnectTimer.AutoReset = true;
|
|
_reconnectTimer.Start();
|
|
|
|
_logger.LogInformation("MQTT reconnect timer initialized and started");
|
|
}
|
|
|
|
private async Task InitializeMqttClient()
|
|
{
|
|
await _mqttSemaphore.WaitAsync();
|
|
try
|
|
{
|
|
_mqttClient = new MqttClientFactory().CreateMqttClient();
|
|
|
|
// Set up event handlers
|
|
_mqttClient.DisconnectedAsync += HandleMqttDisconnection;
|
|
_mqttClient.ApplicationMessageReceivedAsync += HandleMqttMessageReceived;
|
|
|
|
await ConnectMqttClient();
|
|
}
|
|
finally
|
|
{
|
|
_mqttSemaphore.Release();
|
|
}
|
|
}
|
|
|
|
private async Task ConnectMqttClient()
|
|
{
|
|
if (_mqttClient == null || _isDisposed)
|
|
return;
|
|
|
|
try
|
|
{
|
|
if (!_mqttClient.IsConnected)
|
|
{
|
|
_logger.LogInformation("Connecting to MQTT broker at {Broker}:{Port}...", _mqttBroker, _mqttPort);
|
|
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
|
|
_logger.LogInformation("Connected to MQTT broker successfully");
|
|
|
|
await SubscribeToMqttTopics();
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to connect to MQTT broker");
|
|
}
|
|
}
|
|
|
|
private async Task HandleMqttDisconnection(MqttClientDisconnectedEventArgs args)
|
|
{
|
|
_logger.LogWarning("Disconnected from MQTT broker: {Reason}", args.Reason);
|
|
|
|
// ReasonCode 0 means normal disconnect, no need to log as error
|
|
if (args.Exception != null)
|
|
{
|
|
_logger.LogError(args.Exception, "MQTT disconnection error");
|
|
}
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
private async Task TryReconnectMqtt()
|
|
{
|
|
if (_isDisposed)
|
|
return;
|
|
|
|
await _mqttSemaphore.WaitAsync();
|
|
try
|
|
{
|
|
if (_mqttClient != null && !_mqttClient.IsConnected)
|
|
{
|
|
await ConnectMqttClient();
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error during MQTT reconnection attempt");
|
|
}
|
|
finally
|
|
{
|
|
_mqttSemaphore.Release();
|
|
}
|
|
}
|
|
|
|
private async Task SubscribeToMqttTopics()
|
|
{
|
|
if (_mqttClient == null || !_mqttClient.IsConnected)
|
|
return;
|
|
|
|
try
|
|
{
|
|
_logger.LogInformation("Subscribing to MQTT topics...");
|
|
|
|
var topicFilter = new MqttTopicFilterBuilder()
|
|
.WithTopic("iot/devices/#")
|
|
.WithAtLeastOnceQoS()
|
|
.Build();
|
|
|
|
var subscribeResult = await _mqttClient.SubscribeAsync(topicFilter);
|
|
|
|
foreach (var result in subscribeResult.Items)
|
|
{
|
|
_logger.LogInformation("Subscribed to topic '{Topic}' with ResultCode: {ResultCode}",
|
|
result.TopicFilter.Topic, result.ResultCode);
|
|
|
|
if (result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS0 &&
|
|
result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS1 &&
|
|
result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS2)
|
|
{
|
|
_logger.LogWarning("Subscription to topic {Topic} was rejected: {Reason}",
|
|
result.TopicFilter.Topic, result.ResultCode);
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error subscribing to MQTT topics");
|
|
}
|
|
}
|
|
|
|
private async Task HandleMqttMessageReceived(MqttApplicationMessageReceivedEventArgs e)
|
|
{
|
|
if (e == null || e.ApplicationMessage == null)
|
|
return;
|
|
|
|
string topic = e.ApplicationMessage.Topic;
|
|
var payloadBytes = e.ApplicationMessage.Payload;
|
|
|
|
if (payloadBytes.IsEmpty)
|
|
{
|
|
_logger.LogWarning("Received empty payload for topic: {Topic}", topic);
|
|
return;
|
|
}
|
|
|
|
string payloadJson = Encoding.UTF8.GetString(payloadBytes);
|
|
|
|
_logger.LogInformation("Received MQTT message: {Topic} -> {PayloadLength} bytes",
|
|
topic, payloadBytes.Length);
|
|
_logger.LogDebug("Message payload: {Payload}", payloadJson);
|
|
|
|
try
|
|
{
|
|
await ProcessMqttMessage(topic, payloadJson);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing MQTT message for topic {Topic}", topic);
|
|
}
|
|
}
|
|
|
|
private async Task ProcessMqttMessage(string topic, string payloadJson)
|
|
{
|
|
if (!topic.StartsWith("iot/devices/"))
|
|
return;
|
|
|
|
// Handle device update message
|
|
if (topic.EndsWith("/update_PVT"))
|
|
{
|
|
await ProcessUpdatePvtMessage(topic, payloadJson);
|
|
}
|
|
|
|
// Add more message handlers here as needed
|
|
}
|
|
|
|
private async Task ProcessUpdatePvtMessage(string topic, string payloadJson)
|
|
{
|
|
DeviceMessage? messageData;
|
|
|
|
try
|
|
{
|
|
messageData = JsonConvert.DeserializeObject<DeviceMessage>(payloadJson);
|
|
}
|
|
catch (JsonException ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to deserialize message payload");
|
|
return;
|
|
}
|
|
|
|
if (messageData == null)
|
|
{
|
|
_logger.LogError("Deserialized message is null for topic {Topic}", topic);
|
|
return;
|
|
}
|
|
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var deviceService = scope.ServiceProvider.GetRequiredService<IDeviceService>();
|
|
|
|
try
|
|
{
|
|
bool result = await deviceService.UpdatePVT(messageData);
|
|
string imei = messageData.Imei!;
|
|
string responseTopic = string.Format(MqttTopic.UpdatePVTResponseFormat, imei);
|
|
|
|
object responsePayload;
|
|
|
|
if (result)
|
|
{
|
|
_logger.LogInformation("Update PVT for device {Imei} succeeded", imei);
|
|
responsePayload = new
|
|
{
|
|
Imei = imei,
|
|
Status = "success",
|
|
Message = "Update PVT success",
|
|
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
|
|
};
|
|
}
|
|
else
|
|
{
|
|
_logger.LogWarning("Update PVT for device {Imei} failed", imei);
|
|
responsePayload = new
|
|
{
|
|
Imei = imei,
|
|
Status = "error",
|
|
Message = "Update PVT failed",
|
|
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
|
|
};
|
|
}
|
|
|
|
await PublishMqttMessage(responseTopic, responsePayload);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing PVT update for device {Imei}", messageData.Imei);
|
|
|
|
// Try to send error response
|
|
string responseTopic = string.Format(MqttTopic.UpdatePVTResponseFormat, messageData.Imei);
|
|
var errorPayload = new
|
|
{
|
|
Imei = messageData.Imei,
|
|
Status = "error",
|
|
Message = "Internal server error",
|
|
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
|
|
};
|
|
|
|
await PublishMqttMessage(responseTopic, errorPayload);
|
|
}
|
|
}
|
|
|
|
private async Task PublishMqttMessage(string topic, object payload)
|
|
{
|
|
if (_mqttClient == null || !_mqttClient.IsConnected)
|
|
{
|
|
_logger.LogWarning("Cannot publish message: MQTT client is not connected");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
string payloadJson = JsonConvert.SerializeObject(payload);
|
|
|
|
var message = new MqttApplicationMessageBuilder()
|
|
.WithTopic(topic)
|
|
.WithPayload(payloadJson)
|
|
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
|
|
.WithRetainFlag(false)
|
|
.Build();
|
|
|
|
await _mqttClient.PublishAsync(message, CancellationToken.None);
|
|
_logger.LogInformation("Published message to topic {Topic}", topic);
|
|
_logger.LogDebug("Message payload: {Payload}", payloadJson);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error publishing message to topic {Topic}", topic);
|
|
}
|
|
}
|
|
|
|
private async Task DisposeMqttClient()
|
|
{
|
|
await _mqttSemaphore.WaitAsync();
|
|
try
|
|
{
|
|
_isDisposed = true;
|
|
|
|
if (_reconnectTimer != null)
|
|
{
|
|
_reconnectTimer.Stop();
|
|
_reconnectTimer.Dispose();
|
|
}
|
|
|
|
if (_mqttClient != null)
|
|
{
|
|
if (_mqttClient.IsConnected)
|
|
{
|
|
await _mqttClient.DisconnectAsync();
|
|
_logger.LogInformation("Disconnected from MQTT broker");
|
|
}
|
|
|
|
_mqttClient.Dispose();
|
|
_mqttClient = null;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error disposing MQTT client");
|
|
}
|
|
finally
|
|
{
|
|
_mqttSemaphore.Release();
|
|
}
|
|
}
|
|
|
|
public override async Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
_logger.LogInformation("Stopping RuleEngineWorker");
|
|
await DisposeMqttClient();
|
|
await base.StopAsync(cancellationToken);
|
|
}
|
|
|
|
public override void Dispose()
|
|
{
|
|
_mqttSemaphore.Dispose();
|
|
base.Dispose();
|
|
}
|
|
}
|
|
} |