CarTrackingRuleEngine/RuleEngine/RuleEngineWorker.cs

433 lines
16 KiB
C#

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using Newtonsoft.Json;
using RuleEngine.Constants;
using RuleEngine.Database;
using RuleEngine.DTOs;
using RuleEngine.Interfaces;
using Serilog;
namespace RuleEngine
{
public class RuleEngineWorker : BackgroundService
{
// Configuration and dependency injection
private readonly ILogger<RuleEngineWorker> _logger;
private readonly IConfiguration _config;
private readonly IServiceProvider _serviceProvider;
private readonly IDbContextFactory<DatabaseContext> _dbContextFactory;
// MQTT client
private IMqttClient? _mqttClient;
private MqttClientOptions _mqttOptions;
private readonly SemaphoreSlim _mqttSemaphore = new SemaphoreSlim(1, 1);
// MQTT settings
private readonly string _mqttBroker;
private readonly int _mqttPort;
private readonly string _mqttUsername;
private readonly string _mqttPassword;
private readonly string _jwtSecret;
// Reconnection timer
private System.Timers.Timer _reconnectTimer = null!;
private bool _isDisposed = false;
public RuleEngineWorker(
ILogger<RuleEngineWorker> logger,
IServiceProvider serviceProvider,
IConfiguration configuration,
IDbContextFactory<DatabaseContext> dbContextFactory)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_config = configuration ?? throw new ArgumentNullException(nameof(configuration));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_dbContextFactory = dbContextFactory ?? throw new ArgumentNullException(nameof(dbContextFactory));
// Load MQTT configuration
_mqttBroker = _config.GetSection("MqttSettings:Broker").Get<string>() ?? "localhost";
_mqttPort = _config.GetSection("MqttSettings:Port").Get<int?>() ?? 1883;
_jwtSecret = _config.GetSection("MqttSettings:JwtSecret").Get<string>() ?? "emqx@sivan";
_mqttUsername = _config.GetSection("MqttSettings:Username").Get<string>() ?? string.Empty;
_mqttPassword = _config.GetSection("MqttSettings:Password").Get<string>() ?? string.Empty;
// Create MQTT options
_mqttOptions = CreateMqttClientOptions();
_logger.LogInformation("RuleEngineWorker initialized with broker: {Broker}:{Port}", _mqttBroker, _mqttPort);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("RuleEngineWorker starting at: {Time}", DateTimeOffset.Now);
try
{
await InitializeMqttClient();
InitializeReconnectTimer();
// Keep the service running
while (!stoppingToken.IsCancellationRequested)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("RuleEngineWorker running at: {Time}", DateTimeOffset.Now);
}
await Task.Delay(30000, stoppingToken); // Heartbeat every 30 seconds
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("RuleEngineWorker stopping due to cancellation");
}
catch (Exception ex)
{
_logger.LogError(ex, "Unhandled exception in RuleEngineWorker");
}
finally
{
await DisposeMqttClient();
_logger.LogInformation("RuleEngineWorker stopped at: {Time}", DateTimeOffset.Now);
}
}
private MqttClientOptions CreateMqttClientOptions()
{
return new MqttClientOptionsBuilder()
.WithTcpServer(_mqttBroker, 1883)
.WithClientId($"RuleEngineWorker")
.WithCredentials(_mqttUsername, _mqttPassword)
.WithCleanSession(false)
.WithSessionExpiryInterval(604800) // 7 days in seconds
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithTimeout(TimeSpan.FromSeconds(15))
.Build();
}
private void InitializeReconnectTimer()
{
_reconnectTimer = new System.Timers.Timer(10000); // 10 seconds
_reconnectTimer.Elapsed += async (sender, e) => await TryReconnectMqtt();
_reconnectTimer.AutoReset = true;
_reconnectTimer.Start();
_logger.LogInformation("MQTT reconnect timer initialized and started");
}
private async Task InitializeMqttClient()
{
await _mqttSemaphore.WaitAsync();
try
{
_mqttClient = new MqttClientFactory().CreateMqttClient();
// Set up event handlers
_mqttClient.DisconnectedAsync += HandleMqttDisconnection;
_mqttClient.ApplicationMessageReceivedAsync += HandleMqttMessageReceived;
await ConnectMqttClient();
}
finally
{
_mqttSemaphore.Release();
}
}
private async Task ConnectMqttClient()
{
if (_mqttClient == null || _isDisposed)
return;
try
{
if (!_mqttClient.IsConnected)
{
_logger.LogInformation("Connecting to MQTT broker at {Broker}:{Port}...", _mqttBroker, _mqttPort);
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
_logger.LogInformation("Connected to MQTT broker successfully");
await SubscribeToMqttTopics();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to connect to MQTT broker");
}
}
private async Task HandleMqttDisconnection(MqttClientDisconnectedEventArgs args)
{
_logger.LogWarning("Disconnected from MQTT broker: {Reason}", args.Reason);
// ReasonCode 0 means normal disconnect, no need to log as error
if (args.Exception != null)
{
_logger.LogError(args.Exception, "MQTT disconnection error");
}
await Task.CompletedTask;
}
private async Task TryReconnectMqtt()
{
if (_isDisposed)
return;
await _mqttSemaphore.WaitAsync();
try
{
if (_mqttClient != null && !_mqttClient.IsConnected)
{
await ConnectMqttClient();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during MQTT reconnection attempt");
}
finally
{
_mqttSemaphore.Release();
}
}
private async Task SubscribeToMqttTopics()
{
if (_mqttClient == null || !_mqttClient.IsConnected)
return;
try
{
_logger.LogInformation("Subscribing to MQTT topics...");
var topicFilter = new MqttTopicFilterBuilder()
.WithTopic("iot/devices/#")
.WithAtLeastOnceQoS()
.Build();
var subscribeResult = await _mqttClient.SubscribeAsync(topicFilter);
foreach (var result in subscribeResult.Items)
{
_logger.LogInformation("Subscribed to topic '{Topic}' with ResultCode: {ResultCode}",
result.TopicFilter.Topic, result.ResultCode);
if (result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS0 &&
result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS1 &&
result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS2)
{
_logger.LogWarning("Subscription to topic {Topic} was rejected: {Reason}",
result.TopicFilter.Topic, result.ResultCode);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error subscribing to MQTT topics");
}
}
private async Task HandleMqttMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
if (e == null || e.ApplicationMessage == null)
return;
string topic = e.ApplicationMessage.Topic;
var payloadBytes = e.ApplicationMessage.Payload;
if (payloadBytes.IsEmpty)
{
_logger.LogWarning("Received empty payload for topic: {Topic}", topic);
return;
}
string payloadJson = Encoding.UTF8.GetString(payloadBytes);
_logger.LogInformation("Received MQTT message: {Topic} -> {PayloadLength} bytes",
topic, payloadBytes.Length);
_logger.LogDebug("Message payload: {Payload}", payloadJson);
try
{
await ProcessMqttMessage(topic, payloadJson);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing MQTT message for topic {Topic}", topic);
}
}
private async Task ProcessMqttMessage(string topic, string payloadJson)
{
if (!topic.StartsWith("iot/devices/"))
return;
// Handle device update message
if (topic.EndsWith("/update_PVT"))
{
await ProcessUpdatePvtMessage(topic, payloadJson);
}
// Add more message handlers here as needed
}
private async Task ProcessUpdatePvtMessage(string topic, string payloadJson)
{
DeviceMessage? messageData;
try
{
messageData = JsonConvert.DeserializeObject<DeviceMessage>(payloadJson);
}
catch (JsonException ex)
{
_logger.LogError(ex, "Failed to deserialize message payload");
return;
}
if (messageData == null)
{
_logger.LogError("Deserialized message is null for topic {Topic}", topic);
return;
}
using var scope = _serviceProvider.CreateScope();
var deviceService = scope.ServiceProvider.GetRequiredService<IDeviceService>();
try
{
bool result = await deviceService.UpdatePVT(messageData);
string imei = messageData.Imei!;
string responseTopic = string.Format(MqttTopic.UpdatePVTResponseFormat, imei);
object responsePayload;
if (result == true)
{
_logger.LogInformation("Update PVT for device {Imei} succeeded", imei);
responsePayload = new
{
Imei = imei,
Status = "success",
Message = "Update PVT success",
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
}
else
{
_logger.LogWarning("Update PVT for device {Imei} failed", imei);
responsePayload = new
{
Imei = imei,
Status = "error",
Message = "Update PVT failed",
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
}
await PublishMqttMessage(responseTopic, responsePayload);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing PVT update for device {Imei}", messageData.Imei);
// Try to send error response
string responseTopic = string.Format(MqttTopic.UpdatePVTResponseFormat, messageData.Imei);
var errorPayload = new
{
Imei = messageData.Imei,
Status = "error",
Message = "Internal server error",
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
await PublishMqttMessage(responseTopic, errorPayload);
}
}
private async Task PublishMqttMessage(string topic, object payload)
{
if (_mqttClient == null || !_mqttClient.IsConnected)
{
_logger.LogWarning("Cannot publish message: MQTT client is not connected");
return;
}
try
{
string payloadJson = JsonConvert.SerializeObject(payload);
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payloadJson)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(false)
.Build();
await _mqttClient.PublishAsync(message, CancellationToken.None);
_logger.LogInformation("Published message to topic {Topic}", topic);
_logger.LogDebug("Message payload: {Payload}", payloadJson);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error publishing message to topic {Topic}", topic);
}
}
private async Task DisposeMqttClient()
{
await _mqttSemaphore.WaitAsync();
try
{
_isDisposed = true;
if (_reconnectTimer != null)
{
_reconnectTimer.Stop();
_reconnectTimer.Dispose();
}
if (_mqttClient != null)
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
_logger.LogInformation("Disconnected from MQTT broker");
}
_mqttClient.Dispose();
_mqttClient = null;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error disposing MQTT client");
}
finally
{
_mqttSemaphore.Release();
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping RuleEngineWorker");
await DisposeMqttClient();
await base.StopAsync(cancellationToken);
}
public override void Dispose()
{
_mqttSemaphore.Dispose();
base.Dispose();
}
}
}