perf: add pagination, filters, and fix Django sync pagination
This commit is contained in:
@@ -288,48 +288,76 @@ func (s *Syncer) InitialSync() {
|
||||
log.Println("Initial sync completed successfully.")
|
||||
}
|
||||
|
||||
func (s *Syncer) fetchFromDjango(endpoint string, target interface{}) {
|
||||
func (s *Syncer) fetchAllFromDjango(endpoint string, target interface{}) {
|
||||
endpoint = strings.Trim(endpoint, "/")
|
||||
|
||||
url := ""
|
||||
if strings.Contains(endpoint, "?") {
|
||||
url = fmt.Sprintf("%s/api/v1/%s", s.djangoURL, endpoint)
|
||||
} else {
|
||||
url = fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint)
|
||||
}
|
||||
url := fmt.Sprintf("%s/api/v1/%s/", s.djangoURL, endpoint)
|
||||
|
||||
token := s.accessToken
|
||||
if token == "" {
|
||||
token = s.masterPass
|
||||
}
|
||||
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("Failed to fetch %s: %v", endpoint, err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
allData := []json.RawMessage{}
|
||||
|
||||
bodyBytes, _ := io.ReadAll(resp.Body)
|
||||
for url != "" {
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
|
||||
|
||||
if err := json.Unmarshal(bodyBytes, target); err != nil {
|
||||
var paginated struct {
|
||||
Results json.RawMessage `json:"results"`
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("Failed to fetch %s: %v", endpoint, err)
|
||||
return
|
||||
}
|
||||
if err2 := json.Unmarshal(bodyBytes, &paginated); err2 == nil && paginated.Results != nil {
|
||||
if err3 := json.Unmarshal(paginated.Results, target); err3 != nil {
|
||||
log.Printf("Failed to decode paginated %s: %v", endpoint, err3)
|
||||
|
||||
bodyBytes, _ := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
|
||||
if s.IsTokenExpired(bodyBytes) {
|
||||
log.Printf("Token expired, refreshing...")
|
||||
if err := s.RefreshAccessToken(); err == nil {
|
||||
token = s.accessToken
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var paginated struct {
|
||||
Results json.RawMessage `json:"results"`
|
||||
Next *string `json:"next"`
|
||||
Previous *string `json:"previous"`
|
||||
Count int `json:"count"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(bodyBytes, &paginated); err == nil && paginated.Results != nil {
|
||||
var items []json.RawMessage
|
||||
if err := json.Unmarshal(paginated.Results, &items); err == nil {
|
||||
allData = append(allData, items...)
|
||||
}
|
||||
if paginated.Next != nil && *paginated.Next != "" {
|
||||
url = *paginated.Next
|
||||
} else {
|
||||
url = ""
|
||||
}
|
||||
} else {
|
||||
log.Printf("Failed to decode %s: %v", endpoint, err)
|
||||
var items []json.RawMessage
|
||||
if err := json.Unmarshal(bodyBytes, &items); err == nil {
|
||||
allData = append(allData, items...)
|
||||
}
|
||||
url = ""
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Synchronized %s from cloud", endpoint)
|
||||
if len(allData) > 0 {
|
||||
combined, _ := json.Marshal(allData)
|
||||
json.Unmarshal(combined, target)
|
||||
}
|
||||
|
||||
log.Printf("Synchronized %d %s from cloud", len(allData), endpoint)
|
||||
}
|
||||
|
||||
func (s *Syncer) fetchFromDjango(endpoint string, target interface{}) {
|
||||
s.fetchAllFromDjango(endpoint, target)
|
||||
}
|
||||
|
||||
func (s *Syncer) fetchFromDjangoRaw(endpoint string) []byte {
|
||||
@@ -866,6 +894,75 @@ func (s *Syncer) sendMultipartRequest(method, endpoint string, data map[string]i
|
||||
return nil, fmt.Errorf("django returned status: %d | Response: %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
func (s *Syncer) SyncProductsOnly() {
|
||||
log.Println("Starting products-only sync...")
|
||||
|
||||
var products []models.Product
|
||||
s.fetchAllFromDjango("products", &products)
|
||||
log.Printf("Fetched %d products from Django", len(products))
|
||||
|
||||
// Build a map of cloud products by ID
|
||||
cloudMap := make(map[uint]models.Product)
|
||||
for _, p := range products {
|
||||
cloudMap[p.ID] = p
|
||||
}
|
||||
|
||||
// Get all local products
|
||||
var localProducts []models.Product
|
||||
s.repo.GetProductsAll(&localProducts)
|
||||
|
||||
localMap := make(map[uint]models.Product)
|
||||
for _, p := range localProducts {
|
||||
localMap[p.ID] = p
|
||||
}
|
||||
|
||||
created := 0
|
||||
updated := 0
|
||||
failed := 0
|
||||
for _, cp := range products {
|
||||
cp.Sincronizado = true
|
||||
if cp.UUID == "" {
|
||||
if lp, exists := localMap[cp.ID]; exists && lp.UUID != "" {
|
||||
cp.UUID = lp.UUID
|
||||
} else {
|
||||
cp.UUID = uuid.New().String()
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
if _, exists := localMap[cp.ID]; exists {
|
||||
err = s.repo.SaveProduct(&cp)
|
||||
if err == nil {
|
||||
updated++
|
||||
}
|
||||
} else {
|
||||
err = s.repo.CreateProduct(&cp)
|
||||
if err == nil {
|
||||
created++
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
failed++
|
||||
log.Printf("ERROR syncing product %d (%s): %v", cp.ID, cp.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete local products that no longer exist in cloud
|
||||
deleted := 0
|
||||
for _, lp := range localProducts {
|
||||
if _, exists := cloudMap[lp.ID]; !exists {
|
||||
if err := s.repo.DeleteByID("Product", lp.ID); err != nil {
|
||||
log.Printf("ERROR deleting product %d: %v", lp.ID, err)
|
||||
} else {
|
||||
deleted++
|
||||
log.Printf("Deleted local product %d (no longer in cloud)", lp.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Products sync complete: %d created, %d updated, %d deleted, %d failed", created, updated, deleted, failed)
|
||||
}
|
||||
|
||||
func (s *Syncer) syncUsers() {
|
||||
// Optional: logic for background sync of users if needed
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user