using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MQTTnet; using Newtonsoft.Json; using RuleEngine.Constants; using RuleEngine.Database; using RuleEngine.DTOs; using RuleEngine.Interfaces; using Serilog; namespace RuleEngine { public class RuleEngineWorker : BackgroundService { // Configuration and dependency injection private readonly ILogger _logger; private readonly IConfiguration _config; private readonly IServiceProvider _serviceProvider; private readonly IDbContextFactory _dbContextFactory; // MQTT client private IMqttClient? _mqttClient; private MqttClientOptions _mqttOptions; private readonly SemaphoreSlim _mqttSemaphore = new SemaphoreSlim(1, 1); // MQTT settings private readonly string _mqttBroker; private readonly int _mqttPort; private readonly string _mqttUsername; private readonly string _mqttPassword; private readonly string _jwtSecret; // Reconnection timer private System.Timers.Timer _reconnectTimer = null!; private bool _isDisposed = false; public RuleEngineWorker( ILogger logger, IServiceProvider serviceProvider, IConfiguration configuration, IDbContextFactory dbContextFactory) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _config = configuration ?? throw new ArgumentNullException(nameof(configuration)); _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); _dbContextFactory = dbContextFactory ?? throw new ArgumentNullException(nameof(dbContextFactory)); // Load MQTT configuration _mqttBroker = _config.GetSection("MqttSettings:Broker").Get() ?? "localhost"; _mqttPort = _config.GetSection("MqttSettings:Port").Get() ?? 1883; _jwtSecret = _config.GetSection("MqttSettings:JwtSecret").Get() ?? "emqx@sivan"; _mqttUsername = _config.GetSection("MqttSettings:Username").Get() ?? string.Empty; _mqttPassword = _config.GetSection("MqttSettings:Password").Get() ?? string.Empty; // Create MQTT options _mqttOptions = CreateMqttClientOptions(); _logger.LogInformation("RuleEngineWorker initialized with broker: {Broker}:{Port}", _mqttBroker, _mqttPort); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("RuleEngineWorker starting at: {Time}", DateTimeOffset.Now); try { await InitializeMqttClient(); InitializeReconnectTimer(); // Keep the service running while (!stoppingToken.IsCancellationRequested) { if (_logger.IsEnabled(LogLevel.Debug)) { _logger.LogDebug("RuleEngineWorker running at: {Time}", DateTimeOffset.Now); } await Task.Delay(30000, stoppingToken); // Heartbeat every 30 seconds } } catch (OperationCanceledException) { _logger.LogInformation("RuleEngineWorker stopping due to cancellation"); } catch (Exception ex) { _logger.LogError(ex, "Unhandled exception in RuleEngineWorker"); } finally { await DisposeMqttClient(); _logger.LogInformation("RuleEngineWorker stopped at: {Time}", DateTimeOffset.Now); } } private MqttClientOptions CreateMqttClientOptions() { return new MqttClientOptionsBuilder() .WithTcpServer(_mqttBroker, 1883) .WithClientId($"RuleEngineWorker") .WithCredentials(_mqttUsername, _mqttPassword) .WithCleanSession(false) .WithSessionExpiryInterval(604800) // 7 days in seconds .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) .WithTimeout(TimeSpan.FromSeconds(15)) .Build(); } private void InitializeReconnectTimer() { _reconnectTimer = new System.Timers.Timer(10000); // 10 seconds _reconnectTimer.Elapsed += async (sender, e) => await TryReconnectMqtt(); _reconnectTimer.AutoReset = true; _reconnectTimer.Start(); _logger.LogInformation("MQTT reconnect timer initialized and started"); } private async Task InitializeMqttClient() { await _mqttSemaphore.WaitAsync(); try { _mqttClient = new MqttClientFactory().CreateMqttClient(); // Set up event handlers _mqttClient.DisconnectedAsync += HandleMqttDisconnection; _mqttClient.ApplicationMessageReceivedAsync += HandleMqttMessageReceived; await ConnectMqttClient(); } finally { _mqttSemaphore.Release(); } } private async Task ConnectMqttClient() { if (_mqttClient == null || _isDisposed) return; try { if (!_mqttClient.IsConnected) { _logger.LogInformation("Connecting to MQTT broker at {Broker}:{Port}...", _mqttBroker, _mqttPort); await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); _logger.LogInformation("Connected to MQTT broker successfully"); await SubscribeToMqttTopics(); } } catch (Exception ex) { _logger.LogError(ex, "Failed to connect to MQTT broker"); } } private async Task HandleMqttDisconnection(MqttClientDisconnectedEventArgs args) { _logger.LogWarning("Disconnected from MQTT broker: {Reason}", args.Reason); // ReasonCode 0 means normal disconnect, no need to log as error if (args.Exception != null) { _logger.LogError(args.Exception, "MQTT disconnection error"); } await Task.CompletedTask; } private async Task TryReconnectMqtt() { if (_isDisposed) return; await _mqttSemaphore.WaitAsync(); try { if (_mqttClient != null && !_mqttClient.IsConnected) { await ConnectMqttClient(); } } catch (Exception ex) { _logger.LogError(ex, "Error during MQTT reconnection attempt"); } finally { _mqttSemaphore.Release(); } } private async Task SubscribeToMqttTopics() { if (_mqttClient == null || !_mqttClient.IsConnected) return; try { _logger.LogInformation("Subscribing to MQTT topics..."); var topicFilter = new MqttTopicFilterBuilder() .WithTopic("iot/devices/#") .WithAtLeastOnceQoS() .Build(); var subscribeResult = await _mqttClient.SubscribeAsync(topicFilter); foreach (var result in subscribeResult.Items) { _logger.LogInformation("Subscribed to topic '{Topic}' with ResultCode: {ResultCode}", result.TopicFilter.Topic, result.ResultCode); if (result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS0 && result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS1 && result.ResultCode != MqttClientSubscribeResultCode.GrantedQoS2) { _logger.LogWarning("Subscription to topic {Topic} was rejected: {Reason}", result.TopicFilter.Topic, result.ResultCode); } } } catch (Exception ex) { _logger.LogError(ex, "Error subscribing to MQTT topics"); } } private async Task HandleMqttMessageReceived(MqttApplicationMessageReceivedEventArgs e) { if (e == null || e.ApplicationMessage == null) return; string topic = e.ApplicationMessage.Topic; var payloadBytes = e.ApplicationMessage.Payload; if (payloadBytes.IsEmpty) { _logger.LogWarning("Received empty payload for topic: {Topic}", topic); return; } string payloadJson = Encoding.UTF8.GetString(payloadBytes); _logger.LogInformation("Received MQTT message: {Topic} -> {PayloadLength} bytes", topic, payloadBytes.Length); _logger.LogDebug("Message payload: {Payload}", payloadJson); try { await ProcessMqttMessage(topic, payloadJson); } catch (Exception ex) { _logger.LogError(ex, "Error processing MQTT message for topic {Topic}", topic); } } private async Task ProcessMqttMessage(string topic, string payloadJson) { if (!topic.StartsWith("iot/devices/")) return; // Handle device update message if (topic.EndsWith("/update_PVT")) { await ProcessUpdatePvtMessage(topic, payloadJson); } // Add more message handlers here as needed } private async Task ProcessUpdatePvtMessage(string topic, string payloadJson) { DeviceMessage? messageData; try { messageData = JsonConvert.DeserializeObject(payloadJson); } catch (JsonException ex) { _logger.LogError(ex, "Failed to deserialize message payload"); return; } if (messageData == null) { _logger.LogError("Deserialized message is null for topic {Topic}", topic); return; } using var scope = _serviceProvider.CreateScope(); var deviceService = scope.ServiceProvider.GetRequiredService(); try { bool result = await deviceService.UpdatePVT(messageData); string imei = messageData.Imei!; string responseTopic = string.Format(MqttTopic.UpdatePVTResponseFormat, imei); object responsePayload; if (result == true) { _logger.LogInformation("Update PVT for device {Imei} succeeded", imei); responsePayload = new { Imei = imei, Status = "success", Message = "Update PVT success", Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; } else { _logger.LogWarning("Update PVT for device {Imei} failed", imei); responsePayload = new { Imei = imei, Status = "error", Message = "Update PVT failed", Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; } await PublishMqttMessage(responseTopic, responsePayload); } catch (Exception ex) { _logger.LogError(ex, "Error processing PVT update for device {Imei}", messageData.Imei); // Try to send error response string responseTopic = string.Format(MqttTopic.UpdatePVTResponseFormat, messageData.Imei); var errorPayload = new { Imei = messageData.Imei, Status = "error", Message = "Internal server error", Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; await PublishMqttMessage(responseTopic, errorPayload); } } private async Task PublishMqttMessage(string topic, object payload) { if (_mqttClient == null || !_mqttClient.IsConnected) { _logger.LogWarning("Cannot publish message: MQTT client is not connected"); return; } try { string payloadJson = JsonConvert.SerializeObject(payload); var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payloadJson) .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce) .WithRetainFlag(false) .Build(); await _mqttClient.PublishAsync(message, CancellationToken.None); _logger.LogInformation("Published message to topic {Topic}", topic); _logger.LogDebug("Message payload: {Payload}", payloadJson); } catch (Exception ex) { _logger.LogError(ex, "Error publishing message to topic {Topic}", topic); } } private async Task DisposeMqttClient() { await _mqttSemaphore.WaitAsync(); try { _isDisposed = true; if (_reconnectTimer != null) { _reconnectTimer.Stop(); _reconnectTimer.Dispose(); } if (_mqttClient != null) { if (_mqttClient.IsConnected) { await _mqttClient.DisconnectAsync(); _logger.LogInformation("Disconnected from MQTT broker"); } _mqttClient.Dispose(); _mqttClient = null; } } catch (Exception ex) { _logger.LogError(ex, "Error disposing MQTT client"); } finally { _mqttSemaphore.Release(); } } public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Stopping RuleEngineWorker"); await DisposeMqttClient(); await base.StopAsync(cancellationToken); } public override void Dispose() { _mqttSemaphore.Dispose(); base.Dispose(); } } }