Files
Welton Moura 936aad779b feat: RRBEC Local Server - Go backend with Django sync
- Implement local-first architecture with SQLite
- Add bidirectional sync with Django via ChangeLog
- JWT authentication with auto-refresh token
- REST API for products, orders, commands, payments
- Stock management with automatic deduction
2026-04-04 17:38:40 -03:00

872 lines
22 KiB
Go

package sync
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"rrbec_server/internal/models"
"rrbec_server/internal/repository"
"github.com/google/uuid"
)
type Syncer struct {
repo *repository.Repository
djangoURL string
masterUser string
masterPass string
accessToken string
refreshToken string
}
func NewSyncer(repo *repository.Repository, djangoURL, masterUser, masterPass string) *Syncer {
return &Syncer{
repo: repo,
djangoURL: djangoURL,
masterUser: masterUser,
masterPass: masterPass,
}
}
func (s *Syncer) Start() {
go func() {
// 1. Login to Django
token, err := s.LoginToDjango()
if err != nil {
log.Printf("Initial Django login failed: %v", err)
} else {
s.masterPass = token // Use token for future requests
// 2. Load Last Sync ID
lastID := s.repo.GetLastSyncID()
if lastID == 0 {
log.Println("Database is empty or fresh. Starting initial sync...")
s.InitialSync()
}
}
// 3. Sync Loop (Bidirectional)
for {
log.Println("Sync cycle started...")
s.processChangeLog() // Cloud -> Local
s.pushLocalChanges() // Local -> Cloud
time.Sleep(5 * time.Second)
}
}()
}
func (s *Syncer) ensureValidToken() {
if s.accessToken == "" {
token, err := s.LoginToDjango()
if err != nil {
log.Printf("Failed to login: %v", err)
return
}
s.accessToken = token
}
}
func (s *Syncer) ensureValidTokenWithRefresh() {
s.ensureValidToken()
if s.refreshToken == "" {
return
}
resp := s.fetchFromDjangoRaw("products")
if s.IsTokenExpired(resp) {
if err := s.RefreshAccessToken(); err != nil {
log.Printf("Token refresh failed, trying full login: %v", err)
s.accessToken = ""
s.ensureValidToken()
}
}
}
func (s *Syncer) LoginToDjango() (string, error) {
url := fmt.Sprintf("%s/api/v1/token/", s.djangoURL)
loginData := map[string]string{
"username": s.masterUser,
"password": s.masterPass,
}
jsonData, _ := json.Marshal(loginData)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("login failed with status: %d", resp.StatusCode)
}
var result struct {
Access string `json:"access"`
Refresh string `json:"refresh"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
s.accessToken = result.Access
s.refreshToken = result.Refresh
return result.Access, nil
}
func (s *Syncer) RefreshAccessToken() error {
url := fmt.Sprintf("%s/api/v1/token/refresh/", s.djangoURL)
refreshData := map[string]string{
"refresh": s.refreshToken,
}
jsonData, _ := json.Marshal(refreshData)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.Printf("Token refresh failed: %s", string(body))
return fmt.Errorf("refresh failed with status: %d", resp.StatusCode)
}
var result struct {
Access string `json:"access"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}
s.accessToken = result.Access
log.Println("Token refreshed successfully")
return nil
}
func (s *Syncer) IsTokenExpired(responseBody []byte) bool {
var errResp struct {
Code string `json:"code"`
}
if json.Unmarshal(responseBody, &errResp) == nil {
return errResp.Code == "token_not_valid" || strings.Contains(string(responseBody), "token_not_valid")
}
return false
}
func (s *Syncer) InitialSync() {
// Sync Products
var products []models.Product
s.fetchFromDjango("products", &products)
for _, p := range products {
p.Sincronizado = true
if p.UUID == "" {
p.UUID = uuid.New().String()
}
s.repo.SaveProduct(&p)
}
// Sync Clients
var clients []models.Client
s.fetchFromDjango("clients", &clients)
for _, c := range clients {
c.Sincronizado = true
if c.UUID == "" {
c.UUID = uuid.New().String()
}
s.repo.SaveClient(&c)
}
// Sync Mesas
var mesas []models.Mesa
s.fetchFromDjango("mesas", &mesas)
for _, m := range mesas {
m.Sincronizado = true
if m.UUID == "" {
m.UUID = uuid.New().String()
}
s.repo.SaveMesa(&m)
}
// Sync Users
var users []models.User
s.fetchFromDjango("users", &users)
for _, u := range users {
u.Sincronizado = true
if u.UUID == "" {
u.UUID = uuid.New().String()
}
s.repo.SaveUser(&u)
}
// Sync Categories
var categories []models.Category
s.fetchFromDjango("categories", &categories)
for _, cat := range categories {
cat.Sincronizado = true
if cat.UUID == "" {
cat.UUID = uuid.New().String()
}
s.repo.SaveCategory(&cat)
}
// Sync Payment Types
var paymentTypes []models.TypePay
s.fetchFromDjango("payment-types", &paymentTypes)
for _, tp := range paymentTypes {
tp.Sincronizado = true
if tp.UUID == "" {
tp.UUID = uuid.New().String()
}
s.repo.SaveTypePay(&tp)
}
// Sync Comandas
var comandas []models.Comanda
s.fetchFromDjango("comandas", &comandas)
for _, co := range comandas {
co.Sincronizado = true
if co.UUID == "" {
co.UUID = uuid.New().String()
}
s.repo.SaveComanda(&co)
// Sync nested items from the serializer
for _, item := range co.Items {
item.Sincronizado = true
if item.UUID == "" {
item.UUID = uuid.New().String()
}
s.repo.SaveProductComanda(&item)
}
}
// Sync Comanda Items
var items []models.ProductComanda
s.fetchFromDjango("items-comanda", &items)
for _, it := range items {
it.Sincronizado = true
if it.UUID == "" {
it.UUID = uuid.New().String()
}
s.repo.SaveProductComanda(&it)
}
// Sync Orders
var orders []models.Order
s.fetchFromDjango("orders", &orders)
for _, o := range orders {
o.Sincronizado = true
if o.UUID == "" {
o.UUID = uuid.New().String()
}
s.repo.SaveOrder(&o)
}
// Sync Payments
var payments []models.Payment
s.fetchFromDjango("payments", &payments)
for _, pay := range payments {
pay.Sincronizado = true
if pay.UUID == "" {
pay.UUID = uuid.New().String()
}
s.repo.SavePayment(&pay)
}
log.Println("Initial sync completed successfully.")
}
func (s *Syncer) fetchFromDjango(endpoint string, target interface{}) {
endpoint = strings.Trim(endpoint, "/")
url := ""
if strings.Contains(endpoint, "?") {
url = fmt.Sprintf("%s/api/v1/%s", s.djangoURL, endpoint)
} else {
url = fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint)
}
token := s.accessToken
if token == "" {
token = s.masterPass
}
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Printf("Failed to fetch %s: %v", endpoint, err)
return
}
defer resp.Body.Close()
bodyBytes, _ := io.ReadAll(resp.Body)
if err := json.Unmarshal(bodyBytes, target); err != nil {
var paginated struct {
Results json.RawMessage `json:"results"`
}
if err2 := json.Unmarshal(bodyBytes, &paginated); err2 == nil && paginated.Results != nil {
if err3 := json.Unmarshal(paginated.Results, target); err3 != nil {
log.Printf("Failed to decode paginated %s: %v", endpoint, err3)
}
} else {
log.Printf("Failed to decode %s: %v", endpoint, err)
}
}
log.Printf("Synchronized %s from cloud", endpoint)
}
func (s *Syncer) fetchFromDjangoRaw(endpoint string) []byte {
endpoint = strings.Trim(endpoint, "/")
url := ""
if strings.Contains(endpoint, "?") {
url = fmt.Sprintf("%s/api/v1/%s", s.djangoURL, endpoint)
} else {
url = fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint)
}
token := s.accessToken
if token == "" {
token = s.masterPass
}
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Printf("Failed to fetch %s: %v", endpoint, err)
return nil
}
defer resp.Body.Close()
bodyBytes, _ := io.ReadAll(resp.Body)
if s.IsTokenExpired(bodyBytes) {
log.Printf("Token expired, refreshing...")
if err := s.RefreshAccessToken(); err != nil {
log.Printf("Token refresh failed: %v", err)
} else {
return s.fetchFromDjangoRaw(endpoint)
}
}
log.Printf("DEBUG: Raw response for %s: %s", endpoint, string(bodyBytes))
return bodyBytes
}
func (s *Syncer) processChangeLog() {
lastID := s.repo.GetLastSyncID()
url := fmt.Sprintf("sync?since_id=%d", lastID)
var changes []models.ChangeLog
respBody := s.fetchFromDjangoRaw(url)
if respBody != nil {
if err := json.Unmarshal(respBody, &changes); err != nil {
log.Printf("ERROR: Failed to parse changelog: %v", err)
return
}
}
log.Printf("DEBUG: Received %d changes from Django", len(changes))
if len(changes) == 0 {
return
}
maxID := lastID
for _, change := range changes {
log.Printf("Processing change %s for %s (ID: %d)", change.Action, change.ModelName, change.ObjectID)
if change.Action == "SAVE" {
s.fetchAndSaveObject(change.ModelName, change.ObjectID)
} else if change.Action == "DELETE" {
s.repo.DeleteByID(change.ModelName, change.ObjectID)
}
if change.ID > maxID {
maxID = change.ID
}
}
s.repo.SaveLastSyncID(maxID)
}
func (s *Syncer) fetchAndSaveObject(modelName string, id uint) {
endpoint := ""
switch modelName {
case "Product":
endpoint = "products"
case "Comanda":
endpoint = "comandas"
case "ProductComanda":
endpoint = "items-comanda"
case "Order":
endpoint = "orders"
case "Client":
endpoint = "clients"
case "Categories":
endpoint = "categories"
case "Mesa":
endpoint = "mesas"
case "Payments":
endpoint = "payments"
default:
return
}
url := fmt.Sprintf("%s/%d", endpoint, id)
// Create temporary instance based on modelName
switch modelName {
case "Comanda":
var obj models.Comanda
s.fetchFromDjango(url, &obj)
if obj.UUID == "" {
var local models.Comanda
if err := s.repo.GetComandaToSync(obj.ID, &local); err == nil {
obj.UUID = local.UUID
}
if obj.UUID == "" {
obj.UUID = uuid.New().String()
}
}
obj.Sincronizado = true
s.repo.SaveComanda(&obj)
case "Product":
var obj models.Product
s.fetchFromDjango(url, &obj)
if obj.UUID == "" {
var local models.Product
if err := s.repo.GetProductToSync(obj.ID, &local); err == nil {
obj.UUID = local.UUID
}
if obj.UUID == "" {
obj.UUID = uuid.New().String()
}
}
obj.Sincronizado = true
s.repo.SaveProduct(&obj)
case "ProductComanda":
var obj models.ProductComanda
s.fetchFromDjango(url, &obj)
if obj.UUID == "" {
var local models.ProductComanda
if err := s.repo.GetItemToSync(obj.ID, &local); err == nil {
obj.UUID = local.UUID
}
if obj.UUID == "" {
obj.UUID = uuid.New().String()
}
}
obj.Sincronizado = true
s.repo.SaveProductComanda(&obj)
case "Order":
var obj models.Order
s.fetchFromDjango(url, &obj)
if obj.UUID == "" {
var local models.Order
if err := s.repo.GetOrderToSync(obj.ID, &local); err == nil {
obj.UUID = local.UUID
}
if obj.UUID == "" {
obj.UUID = uuid.New().String()
}
}
obj.Sincronizado = true
s.repo.SaveOrder(&obj)
case "Client":
var obj models.Client
s.fetchFromDjango(url, &obj)
if obj.UUID == "" {
obj.UUID = uuid.New().String()
}
obj.Sincronizado = true
s.repo.SaveClient(&obj)
case "Categories":
var obj models.Category
s.fetchFromDjango(url, &obj)
if obj.UUID == "" {
obj.UUID = uuid.New().String()
}
obj.Sincronizado = true
s.repo.SaveCategory(&obj)
case "Mesa":
var obj models.Mesa
s.fetchFromDjango(url, &obj)
if obj.UUID == "" {
obj.UUID = uuid.New().String()
}
obj.Sincronizado = true
s.repo.SaveMesa(&obj)
case "Payments":
var obj models.Payment
s.fetchFromDjango(url, &obj)
if obj.UUID == "" {
var local models.Payment
if err := s.repo.GetPaymentToSync(obj.ID, &local); err == nil {
obj.UUID = local.UUID
}
if obj.UUID == "" {
obj.UUID = uuid.New().String()
}
}
obj.Sincronizado = true
s.repo.SavePayment(&obj)
}
}
func (s *Syncer) pushLocalChanges() {
// 1. Sync Comandas
comandas, err := s.repo.GetUnsyncedComandas()
if err == nil && len(comandas) > 0 {
for _, c := range comandas {
if c.MesaID == 0 {
log.Printf("SKIP: Comanda %d has mesa 0. Correct data required.", c.ID)
continue
}
// Capture local ID before we might change it
oldID := c.ID
comandaName := c.Name
if comandaName == "" {
comandaName = fmt.Sprintf("Comanda %d", oldID)
}
payload := map[string]interface{}{
"mesa": c.MesaID,
"client": c.ClientID,
"user": c.UserID,
"status": c.Status,
"name": comandaName,
"uuid": c.UUID,
}
// Try PATCH first
endpoint := fmt.Sprintf("comandas/%d", c.ID)
respBody, err := s.sendRequest("PATCH", endpoint, payload)
if err != nil && strings.Contains(err.Error(), "status: 404") {
respBody, err = s.sendRequest("POST", "comandas", payload)
}
if err == nil {
var created models.Comanda
json.Unmarshal(respBody, &created)
// CRITICAL: Update local ID and children safely
if created.ID != 0 && created.ID != oldID {
s.repo.SafeChangeID("comandas", oldID, created.ID)
// Update items that were pointing to the old ID
s.repo.UpdateFK("product_comandas", "comanda_id", oldID, created.ID)
s.repo.UpdateFK("orders", "id_comanda", oldID, created.ID)
s.repo.UpdateFK("payments", "comanda_id", oldID, created.ID)
}
s.repo.MarkAsSynced(&models.Comanda{}, created.ID)
log.Printf("SUCCESS: Pushed Comanda %d (now %d) to cloud", oldID, created.ID)
} else {
log.Printf("ERROR pushing Comanda %d: %v", oldID, err)
}
}
}
// 2. Sync Items
items, err := s.repo.GetUnsyncedItems()
if err == nil && len(items) > 0 {
for _, it := range items {
oldID := it.ID
payload := map[string]interface{}{
"comanda": it.ComandaID,
"product": it.ProductID,
"applicant": it.Applicant,
"uuid": it.UUID,
}
endpoint := fmt.Sprintf("items-comanda/%d", it.ID)
respBody, err := s.sendRequest("PATCH", endpoint, payload)
if err != nil && strings.Contains(err.Error(), "status: 404") {
respBody, err = s.sendRequest("POST", "items-comanda", payload)
}
if err == nil {
var created models.ProductComanda
json.Unmarshal(respBody, &created)
if created.ID != 0 && created.ID != oldID {
s.repo.SafeChangeID("product_comandas", oldID, created.ID)
s.repo.UpdateFK("orders", "product_comanda_id", oldID, created.ID)
}
s.repo.MarkAsSynced(&models.ProductComanda{}, created.ID)
log.Printf("SUCCESS: Pushed Item %d (now %d) to cloud", oldID, created.ID)
} else {
log.Printf("ERROR pushing Item %d: %v", oldID, err)
}
}
}
// 3. Sync Orders
orders, err := s.repo.GetUnsyncedOrders()
if err == nil && len(orders) > 0 {
for _, o := range orders {
oldID := o.ID
payload := map[string]interface{}{
"productComanda": o.ProductComandaID,
"id_product": o.ProductID,
"id_comanda": o.ComandaID,
"obs": o.Obs,
"uuid": o.UUID,
}
// Optional: mapping timestamps if they exist
if !o.Queue.IsZero() {
payload["queue"] = o.Queue
}
if o.Preparing != nil {
payload["preparing"] = o.Preparing
}
if o.Finished != nil {
payload["finished"] = o.Finished
}
if o.Delivered != nil {
payload["delivered"] = o.Delivered
}
if o.Canceled != nil {
payload["canceled"] = o.Canceled
}
endpoint := fmt.Sprintf("orders/%d", o.ID)
respBody, err := s.sendRequest("PATCH", endpoint, payload)
if err != nil && strings.Contains(err.Error(), "status: 404") {
respBody, err = s.sendRequest("POST", "orders", payload)
}
if err == nil {
var created models.Order
json.Unmarshal(respBody, &created)
if created.ID != 0 && created.ID != oldID {
s.repo.SafeChangeID("orders", oldID, created.ID)
}
s.repo.MarkAsSynced(&models.Order{}, created.ID)
log.Printf("SUCCESS: Pushed Order %d (now %d) to cloud", oldID, created.ID)
} else {
log.Printf("ERROR pushing Order %d: %v", oldID, err)
}
}
}
// 4. Sync Payments
payments, err := s.repo.GetUnsyncedPayments()
if err == nil && len(payments) > 0 {
for _, p := range payments {
if p.TypePayID == 0 {
log.Printf("SKIP: Payment %d has invalid type_pay 0.", p.ID)
s.repo.MarkAsSynced(&models.Payment{}, p.ID)
continue
}
oldID := p.ID
payload := map[string]interface{}{
"value": p.Value,
"type_pay": p.TypePayID,
"comanda": p.ComandaID,
"description": p.Description,
"datetime": p.DateTime,
"uuid": p.UUID,
}
if p.ClientID != nil {
payload["client"] = *p.ClientID
}
endpoint := fmt.Sprintf("payments/%d", p.ID)
respBody, err := s.sendRequest("PATCH", endpoint, payload)
if err != nil && strings.Contains(err.Error(), "status: 404") {
respBody, err = s.sendRequest("POST", "payments", payload)
}
if err == nil {
var created models.Payment
json.Unmarshal(respBody, &created)
if created.ID != 0 && created.ID != oldID {
s.repo.SafeChangeID("payments", oldID, created.ID)
}
s.repo.MarkAsSynced(&models.Payment{}, created.ID)
log.Printf("SUCCESS: Pushed Payment %d (now %d) to cloud", oldID, created.ID)
} else {
log.Printf("ERROR pushing Payment %d: %v", oldID, err)
}
}
}
// 5. Sync Products
products, err := s.repo.GetUnsyncedProducts()
if err == nil && len(products) > 0 {
for _, p := range products {
oldID := p.ID
payload := map[string]interface{}{
"name": p.Name,
"description": p.Description,
"image": p.Image,
"price": p.Price,
"quantity": p.Quantity,
"category": p.CategoryID,
"cuisine": p.Cuisine,
"active": p.Active,
"uuid": p.UUID,
}
if p.UnitOfMeasure != nil {
payload["unit_of_measure"] = *p.UnitOfMeasure
}
endpoint := fmt.Sprintf("products/%d", p.ID)
respBody, err := s.sendMultipartRequest("PATCH", endpoint, payload)
if err != nil && strings.Contains(err.Error(), "status: 404") {
respBody, err = s.sendMultipartRequest("POST", "products", payload)
}
if err == nil {
var created models.Product
json.Unmarshal(respBody, &created)
if created.ID != 0 && created.ID != oldID {
s.repo.SafeChangeID("products", oldID, created.ID)
}
s.repo.MarkAsSynced(&models.Product{}, created.ID)
log.Printf("SUCCESS: Pushed Product %d (now %d) to cloud", oldID, created.ID)
} else {
log.Printf("ERROR pushing Product %d: %v", oldID, err)
}
}
}
}
func (s *Syncer) sendRequest(method, endpoint string, data interface{}) ([]byte, error) {
url := fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint)
jsonData, _ := json.Marshal(data)
token := s.accessToken
if token == "" {
token = s.masterPass
}
req, _ := http.NewRequest(method, url, bytes.NewBuffer(jsonData))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode == 401 || strings.Contains(string(respBody), "token_not_valid") {
log.Printf("Token expired during %s %s, refreshing...", method, endpoint)
if err := s.RefreshAccessToken(); err != nil {
return nil, fmt.Errorf("token refresh failed: %w", err)
}
return s.sendRequest(method, endpoint, data)
}
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return respBody, nil
}
return nil, fmt.Errorf("django returned status: %d | Response: %s", resp.StatusCode, string(respBody))
}
func (s *Syncer) sendMultipartRequest(method, endpoint string, data map[string]interface{}) ([]byte, error) {
urlStr := fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint)
var b bytes.Buffer
w := multipart.NewWriter(&b)
for key, val := range data {
if key == "image" {
imageStr, ok := val.(string)
if !ok || imageStr == "" {
continue // Skip empty image
}
// If it's an existing URL, don't re-upload it
if strings.HasPrefix(imageStr, "http") || strings.HasPrefix(imageStr, "/media/") || strings.HasPrefix(imageStr, "/images/") {
continue
}
// Base64
if strings.HasPrefix(imageStr, "data:image") {
parts := strings.Split(imageStr, ",")
if len(parts) == 2 {
decoded, err := base64.StdEncoding.DecodeString(parts[1])
if err == nil {
fw, _ := w.CreateFormFile(key, "upload.jpg")
fw.Write(decoded)
}
}
continue
}
// Local file
fileContent, err := os.ReadFile(imageStr)
if err == nil {
fw, _ := w.CreateFormFile(key, filepath.Base(imageStr))
fw.Write(fileContent)
}
} else {
// standard text field
w.WriteField(key, fmt.Sprintf("%v", val))
}
}
w.Close()
token := s.accessToken
if token == "" {
token = s.masterPass
}
req, _ := http.NewRequest(method, urlStr, &b)
req.Header.Set("Content-Type", w.FormDataContentType())
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
client := &http.Client{Timeout: 15 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode == 401 || strings.Contains(string(respBody), "token_not_valid") {
log.Printf("Token expired during %s %s, refreshing...", method, urlStr)
if err := s.RefreshAccessToken(); err != nil {
return nil, fmt.Errorf("token refresh failed: %w", err)
}
return s.sendMultipartRequest(method, endpoint, data)
}
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return respBody, nil
}
return nil, fmt.Errorf("django returned status: %d | Response: %s", resp.StatusCode, string(respBody))
}
func (s *Syncer) syncUsers() {
// Optional: logic for background sync of users if needed
}