package sync import ( "bytes" "encoding/base64" "encoding/json" "fmt" "io" "log" "mime/multipart" "net/http" "os" "path/filepath" "strings" "time" "rrbec_server/internal/models" "rrbec_server/internal/repository" "github.com/google/uuid" ) type Syncer struct { repo *repository.Repository djangoURL string masterUser string masterPass string accessToken string refreshToken string } func NewSyncer(repo *repository.Repository, djangoURL, masterUser, masterPass string) *Syncer { return &Syncer{ repo: repo, djangoURL: djangoURL, masterUser: masterUser, masterPass: masterPass, } } func (s *Syncer) Start() { go func() { // 1. Login to Django token, err := s.LoginToDjango() if err != nil { log.Printf("Initial Django login failed: %v", err) } else { s.masterPass = token // Use token for future requests // 2. Load Last Sync ID lastID := s.repo.GetLastSyncID() if lastID == 0 { log.Println("Database is empty or fresh. Starting initial sync...") s.InitialSync() } } // 3. Sync Loop (Bidirectional) for { log.Println("Sync cycle started...") s.processChangeLog() // Cloud -> Local s.pushLocalChanges() // Local -> Cloud time.Sleep(5 * time.Second) } }() } func (s *Syncer) ensureValidToken() { if s.accessToken == "" { token, err := s.LoginToDjango() if err != nil { log.Printf("Failed to login: %v", err) return } s.accessToken = token } } func (s *Syncer) ensureValidTokenWithRefresh() { s.ensureValidToken() if s.refreshToken == "" { return } resp := s.fetchFromDjangoRaw("products") if s.IsTokenExpired(resp) { if err := s.RefreshAccessToken(); err != nil { log.Printf("Token refresh failed, trying full login: %v", err) s.accessToken = "" s.ensureValidToken() } } } func (s *Syncer) LoginToDjango() (string, error) { url := fmt.Sprintf("%s/api/v1/token/", s.djangoURL) loginData := map[string]string{ "username": s.masterUser, "password": s.masterPass, } jsonData, _ := json.Marshal(loginData) resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData)) if err != nil { return "", err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("login failed with status: %d", resp.StatusCode) } var result struct { Access string `json:"access"` Refresh string `json:"refresh"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return "", err } s.accessToken = result.Access s.refreshToken = result.Refresh return result.Access, nil } func (s *Syncer) RefreshAccessToken() error { url := fmt.Sprintf("%s/api/v1/token/refresh/", s.djangoURL) refreshData := map[string]string{ "refresh": s.refreshToken, } jsonData, _ := json.Marshal(refreshData) resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData)) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) log.Printf("Token refresh failed: %s", string(body)) return fmt.Errorf("refresh failed with status: %d", resp.StatusCode) } var result struct { Access string `json:"access"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return err } s.accessToken = result.Access log.Println("Token refreshed successfully") return nil } func (s *Syncer) IsTokenExpired(responseBody []byte) bool { var errResp struct { Code string `json:"code"` } if json.Unmarshal(responseBody, &errResp) == nil { return errResp.Code == "token_not_valid" || strings.Contains(string(responseBody), "token_not_valid") } return false } func (s *Syncer) InitialSync() { // Sync Products var products []models.Product s.fetchFromDjango("products", &products) for _, p := range products { p.Sincronizado = true if p.UUID == "" { p.UUID = uuid.New().String() } s.repo.SaveProduct(&p) } // Sync Clients var clients []models.Client s.fetchFromDjango("clients", &clients) for _, c := range clients { c.Sincronizado = true if c.UUID == "" { c.UUID = uuid.New().String() } s.repo.SaveClient(&c) } // Sync Mesas var mesas []models.Mesa s.fetchFromDjango("mesas", &mesas) for _, m := range mesas { m.Sincronizado = true if m.UUID == "" { m.UUID = uuid.New().String() } s.repo.SaveMesa(&m) } // Sync Users var users []models.User s.fetchFromDjango("users", &users) for _, u := range users { u.Sincronizado = true if u.UUID == "" { u.UUID = uuid.New().String() } s.repo.SaveUser(&u) } // Sync Categories var categories []models.Category s.fetchFromDjango("categories", &categories) for _, cat := range categories { cat.Sincronizado = true if cat.UUID == "" { cat.UUID = uuid.New().String() } s.repo.SaveCategory(&cat) } // Sync Payment Types var paymentTypes []models.TypePay s.fetchFromDjango("payment-types", &paymentTypes) for _, tp := range paymentTypes { tp.Sincronizado = true if tp.UUID == "" { tp.UUID = uuid.New().String() } s.repo.SaveTypePay(&tp) } // Sync Comandas var comandas []models.Comanda s.fetchFromDjango("comandas", &comandas) for _, co := range comandas { co.Sincronizado = true if co.UUID == "" { co.UUID = uuid.New().String() } s.repo.SaveComanda(&co) // Sync nested items from the serializer for _, item := range co.Items { item.Sincronizado = true if item.UUID == "" { item.UUID = uuid.New().String() } s.repo.SaveProductComanda(&item) } } // Sync Comanda Items var items []models.ProductComanda s.fetchFromDjango("items-comanda", &items) for _, it := range items { it.Sincronizado = true if it.UUID == "" { it.UUID = uuid.New().String() } s.repo.SaveProductComanda(&it) } // Sync Orders var orders []models.Order s.fetchFromDjango("orders", &orders) for _, o := range orders { o.Sincronizado = true if o.UUID == "" { o.UUID = uuid.New().String() } s.repo.SaveOrder(&o) } // Sync Payments var payments []models.Payment s.fetchFromDjango("payments", &payments) for _, pay := range payments { pay.Sincronizado = true if pay.UUID == "" { pay.UUID = uuid.New().String() } s.repo.SavePayment(&pay) } log.Println("Initial sync completed successfully.") } func (s *Syncer) fetchFromDjango(endpoint string, target interface{}) { endpoint = strings.Trim(endpoint, "/") url := "" if strings.Contains(endpoint, "?") { url = fmt.Sprintf("%s/api/v1/%s", s.djangoURL, endpoint) } else { url = fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint) } token := s.accessToken if token == "" { token = s.masterPass } req, _ := http.NewRequest("GET", url, nil) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req) if err != nil { log.Printf("Failed to fetch %s: %v", endpoint, err) return } defer resp.Body.Close() bodyBytes, _ := io.ReadAll(resp.Body) if err := json.Unmarshal(bodyBytes, target); err != nil { var paginated struct { Results json.RawMessage `json:"results"` } if err2 := json.Unmarshal(bodyBytes, &paginated); err2 == nil && paginated.Results != nil { if err3 := json.Unmarshal(paginated.Results, target); err3 != nil { log.Printf("Failed to decode paginated %s: %v", endpoint, err3) } } else { log.Printf("Failed to decode %s: %v", endpoint, err) } } log.Printf("Synchronized %s from cloud", endpoint) } func (s *Syncer) fetchFromDjangoRaw(endpoint string) []byte { endpoint = strings.Trim(endpoint, "/") url := "" if strings.Contains(endpoint, "?") { url = fmt.Sprintf("%s/api/v1/%s", s.djangoURL, endpoint) } else { url = fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint) } token := s.accessToken if token == "" { token = s.masterPass } req, _ := http.NewRequest("GET", url, nil) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req) if err != nil { log.Printf("Failed to fetch %s: %v", endpoint, err) return nil } defer resp.Body.Close() bodyBytes, _ := io.ReadAll(resp.Body) if s.IsTokenExpired(bodyBytes) { log.Printf("Token expired, refreshing...") if err := s.RefreshAccessToken(); err != nil { log.Printf("Token refresh failed: %v", err) } else { return s.fetchFromDjangoRaw(endpoint) } } log.Printf("DEBUG: Raw response for %s: %s", endpoint, string(bodyBytes)) return bodyBytes } func (s *Syncer) processChangeLog() { lastID := s.repo.GetLastSyncID() url := fmt.Sprintf("sync?since_id=%d", lastID) var changes []models.ChangeLog respBody := s.fetchFromDjangoRaw(url) if respBody != nil { if err := json.Unmarshal(respBody, &changes); err != nil { log.Printf("ERROR: Failed to parse changelog: %v", err) return } } log.Printf("DEBUG: Received %d changes from Django", len(changes)) if len(changes) == 0 { return } maxID := lastID for _, change := range changes { log.Printf("Processing change %s for %s (ID: %d)", change.Action, change.ModelName, change.ObjectID) if change.Action == "SAVE" { s.fetchAndSaveObject(change.ModelName, change.ObjectID) } else if change.Action == "DELETE" { s.repo.DeleteByID(change.ModelName, change.ObjectID) } if change.ID > maxID { maxID = change.ID } } s.repo.SaveLastSyncID(maxID) } func (s *Syncer) fetchAndSaveObject(modelName string, id uint) { endpoint := "" switch modelName { case "Product": endpoint = "products" case "Comanda": endpoint = "comandas" case "ProductComanda": endpoint = "items-comanda" case "Order": endpoint = "orders" case "Client": endpoint = "clients" case "Categories": endpoint = "categories" case "Mesa": endpoint = "mesas" case "Payments": endpoint = "payments" default: return } url := fmt.Sprintf("%s/%d", endpoint, id) // Create temporary instance based on modelName switch modelName { case "Comanda": var obj models.Comanda s.fetchFromDjango(url, &obj) if obj.UUID == "" { var local models.Comanda if err := s.repo.GetComandaToSync(obj.ID, &local); err == nil { obj.UUID = local.UUID } if obj.UUID == "" { obj.UUID = uuid.New().String() } } obj.Sincronizado = true s.repo.SaveComanda(&obj) case "Product": var obj models.Product s.fetchFromDjango(url, &obj) if obj.UUID == "" { var local models.Product if err := s.repo.GetProductToSync(obj.ID, &local); err == nil { obj.UUID = local.UUID } if obj.UUID == "" { obj.UUID = uuid.New().String() } } obj.Sincronizado = true s.repo.SaveProduct(&obj) case "ProductComanda": var obj models.ProductComanda s.fetchFromDjango(url, &obj) if obj.UUID == "" { var local models.ProductComanda if err := s.repo.GetItemToSync(obj.ID, &local); err == nil { obj.UUID = local.UUID } if obj.UUID == "" { obj.UUID = uuid.New().String() } } obj.Sincronizado = true s.repo.SaveProductComanda(&obj) case "Order": var obj models.Order s.fetchFromDjango(url, &obj) if obj.UUID == "" { var local models.Order if err := s.repo.GetOrderToSync(obj.ID, &local); err == nil { obj.UUID = local.UUID } if obj.UUID == "" { obj.UUID = uuid.New().String() } } obj.Sincronizado = true s.repo.SaveOrder(&obj) case "Client": var obj models.Client s.fetchFromDjango(url, &obj) if obj.UUID == "" { obj.UUID = uuid.New().String() } obj.Sincronizado = true s.repo.SaveClient(&obj) case "Categories": var obj models.Category s.fetchFromDjango(url, &obj) if obj.UUID == "" { obj.UUID = uuid.New().String() } obj.Sincronizado = true s.repo.SaveCategory(&obj) case "Mesa": var obj models.Mesa s.fetchFromDjango(url, &obj) if obj.UUID == "" { obj.UUID = uuid.New().String() } obj.Sincronizado = true s.repo.SaveMesa(&obj) case "Payments": var obj models.Payment s.fetchFromDjango(url, &obj) if obj.UUID == "" { var local models.Payment if err := s.repo.GetPaymentToSync(obj.ID, &local); err == nil { obj.UUID = local.UUID } if obj.UUID == "" { obj.UUID = uuid.New().String() } } obj.Sincronizado = true s.repo.SavePayment(&obj) } } func (s *Syncer) pushLocalChanges() { // 1. Sync Comandas comandas, err := s.repo.GetUnsyncedComandas() if err == nil && len(comandas) > 0 { for _, c := range comandas { if c.MesaID == 0 { log.Printf("SKIP: Comanda %d has mesa 0. Correct data required.", c.ID) continue } // Capture local ID before we might change it oldID := c.ID comandaName := c.Name if comandaName == "" { comandaName = fmt.Sprintf("Comanda %d", oldID) } payload := map[string]interface{}{ "mesa": c.MesaID, "client": c.ClientID, "user": c.UserID, "status": c.Status, "name": comandaName, "uuid": c.UUID, } // Try PATCH first endpoint := fmt.Sprintf("comandas/%d", c.ID) respBody, err := s.sendRequest("PATCH", endpoint, payload) if err != nil && strings.Contains(err.Error(), "status: 404") { respBody, err = s.sendRequest("POST", "comandas", payload) } if err == nil { var created models.Comanda json.Unmarshal(respBody, &created) // CRITICAL: Update local ID and children safely if created.ID != 0 && created.ID != oldID { s.repo.SafeChangeID("comandas", oldID, created.ID) // Update items that were pointing to the old ID s.repo.UpdateFK("product_comandas", "comanda_id", oldID, created.ID) s.repo.UpdateFK("orders", "id_comanda", oldID, created.ID) s.repo.UpdateFK("payments", "comanda_id", oldID, created.ID) } s.repo.MarkAsSynced(&models.Comanda{}, created.ID) log.Printf("SUCCESS: Pushed Comanda %d (now %d) to cloud", oldID, created.ID) } else { log.Printf("ERROR pushing Comanda %d: %v", oldID, err) } } } // 2. Sync Items items, err := s.repo.GetUnsyncedItems() if err == nil && len(items) > 0 { for _, it := range items { oldID := it.ID payload := map[string]interface{}{ "comanda": it.ComandaID, "product": it.ProductID, "applicant": it.Applicant, "uuid": it.UUID, } endpoint := fmt.Sprintf("items-comanda/%d", it.ID) respBody, err := s.sendRequest("PATCH", endpoint, payload) if err != nil && strings.Contains(err.Error(), "status: 404") { respBody, err = s.sendRequest("POST", "items-comanda", payload) } if err == nil { var created models.ProductComanda json.Unmarshal(respBody, &created) if created.ID != 0 && created.ID != oldID { s.repo.SafeChangeID("product_comandas", oldID, created.ID) s.repo.UpdateFK("orders", "product_comanda_id", oldID, created.ID) } s.repo.MarkAsSynced(&models.ProductComanda{}, created.ID) log.Printf("SUCCESS: Pushed Item %d (now %d) to cloud", oldID, created.ID) } else { log.Printf("ERROR pushing Item %d: %v", oldID, err) } } } // 3. Sync Orders orders, err := s.repo.GetUnsyncedOrders() if err == nil && len(orders) > 0 { for _, o := range orders { oldID := o.ID payload := map[string]interface{}{ "productComanda": o.ProductComandaID, "id_product": o.ProductID, "id_comanda": o.ComandaID, "obs": o.Obs, "uuid": o.UUID, } // Optional: mapping timestamps if they exist if !o.Queue.IsZero() { payload["queue"] = o.Queue } if o.Preparing != nil { payload["preparing"] = o.Preparing } if o.Finished != nil { payload["finished"] = o.Finished } if o.Delivered != nil { payload["delivered"] = o.Delivered } if o.Canceled != nil { payload["canceled"] = o.Canceled } endpoint := fmt.Sprintf("orders/%d", o.ID) respBody, err := s.sendRequest("PATCH", endpoint, payload) if err != nil && strings.Contains(err.Error(), "status: 404") { respBody, err = s.sendRequest("POST", "orders", payload) } if err == nil { var created models.Order json.Unmarshal(respBody, &created) if created.ID != 0 && created.ID != oldID { s.repo.SafeChangeID("orders", oldID, created.ID) } s.repo.MarkAsSynced(&models.Order{}, created.ID) log.Printf("SUCCESS: Pushed Order %d (now %d) to cloud", oldID, created.ID) } else { log.Printf("ERROR pushing Order %d: %v", oldID, err) } } } // 4. Sync Payments payments, err := s.repo.GetUnsyncedPayments() if err == nil && len(payments) > 0 { for _, p := range payments { if p.TypePayID == 0 { log.Printf("SKIP: Payment %d has invalid type_pay 0.", p.ID) s.repo.MarkAsSynced(&models.Payment{}, p.ID) continue } oldID := p.ID payload := map[string]interface{}{ "value": p.Value, "type_pay": p.TypePayID, "comanda": p.ComandaID, "description": p.Description, "datetime": p.DateTime, "uuid": p.UUID, } if p.ClientID != nil { payload["client"] = *p.ClientID } endpoint := fmt.Sprintf("payments/%d", p.ID) respBody, err := s.sendRequest("PATCH", endpoint, payload) if err != nil && strings.Contains(err.Error(), "status: 404") { respBody, err = s.sendRequest("POST", "payments", payload) } if err == nil { var created models.Payment json.Unmarshal(respBody, &created) if created.ID != 0 && created.ID != oldID { s.repo.SafeChangeID("payments", oldID, created.ID) } s.repo.MarkAsSynced(&models.Payment{}, created.ID) log.Printf("SUCCESS: Pushed Payment %d (now %d) to cloud", oldID, created.ID) } else { log.Printf("ERROR pushing Payment %d: %v", oldID, err) } } } // 5. Sync Products products, err := s.repo.GetUnsyncedProducts() if err == nil && len(products) > 0 { for _, p := range products { oldID := p.ID payload := map[string]interface{}{ "name": p.Name, "description": p.Description, "image": p.Image, "price": p.Price, "quantity": p.Quantity, "category": p.CategoryID, "cuisine": p.Cuisine, "active": p.Active, "uuid": p.UUID, } if p.UnitOfMeasure != nil { payload["unit_of_measure"] = *p.UnitOfMeasure } endpoint := fmt.Sprintf("products/%d", p.ID) respBody, err := s.sendMultipartRequest("PATCH", endpoint, payload) if err != nil && strings.Contains(err.Error(), "status: 404") { respBody, err = s.sendMultipartRequest("POST", "products", payload) } if err == nil { var created models.Product json.Unmarshal(respBody, &created) if created.ID != 0 && created.ID != oldID { s.repo.SafeChangeID("products", oldID, created.ID) } s.repo.MarkAsSynced(&models.Product{}, created.ID) log.Printf("SUCCESS: Pushed Product %d (now %d) to cloud", oldID, created.ID) } else { log.Printf("ERROR pushing Product %d: %v", oldID, err) } } } } func (s *Syncer) sendRequest(method, endpoint string, data interface{}) ([]byte, error) { url := fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint) jsonData, _ := json.Marshal(data) token := s.accessToken if token == "" { token = s.masterPass } req, _ := http.NewRequest(method, url, bytes.NewBuffer(jsonData)) req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode == 401 || strings.Contains(string(respBody), "token_not_valid") { log.Printf("Token expired during %s %s, refreshing...", method, endpoint) if err := s.RefreshAccessToken(); err != nil { return nil, fmt.Errorf("token refresh failed: %w", err) } return s.sendRequest(method, endpoint, data) } if resp.StatusCode >= 200 && resp.StatusCode < 300 { return respBody, nil } return nil, fmt.Errorf("django returned status: %d | Response: %s", resp.StatusCode, string(respBody)) } func (s *Syncer) sendMultipartRequest(method, endpoint string, data map[string]interface{}) ([]byte, error) { urlStr := fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint) var b bytes.Buffer w := multipart.NewWriter(&b) for key, val := range data { if key == "image" { imageStr, ok := val.(string) if !ok || imageStr == "" { continue // Skip empty image } // If it's an existing URL, don't re-upload it if strings.HasPrefix(imageStr, "http") || strings.HasPrefix(imageStr, "/media/") || strings.HasPrefix(imageStr, "/images/") { continue } // Base64 if strings.HasPrefix(imageStr, "data:image") { parts := strings.Split(imageStr, ",") if len(parts) == 2 { decoded, err := base64.StdEncoding.DecodeString(parts[1]) if err == nil { fw, _ := w.CreateFormFile(key, "upload.jpg") fw.Write(decoded) } } continue } // Local file fileContent, err := os.ReadFile(imageStr) if err == nil { fw, _ := w.CreateFormFile(key, filepath.Base(imageStr)) fw.Write(fileContent) } } else { // standard text field w.WriteField(key, fmt.Sprintf("%v", val)) } } w.Close() token := s.accessToken if token == "" { token = s.masterPass } req, _ := http.NewRequest(method, urlStr, &b) req.Header.Set("Content-Type", w.FormDataContentType()) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{Timeout: 15 * time.Second} resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode == 401 || strings.Contains(string(respBody), "token_not_valid") { log.Printf("Token expired during %s %s, refreshing...", method, urlStr) if err := s.RefreshAccessToken(); err != nil { return nil, fmt.Errorf("token refresh failed: %w", err) } return s.sendMultipartRequest(method, endpoint, data) } if resp.StatusCode >= 200 && resp.StatusCode < 300 { return respBody, nil } return nil, fmt.Errorf("django returned status: %d | Response: %s", resp.StatusCode, string(respBody)) } func (s *Syncer) syncUsers() { // Optional: logic for background sync of users if needed }