From 8ac5ed3c2cc0e7d234f8b55407af6d7d44663913 Mon Sep 17 00:00:00 2001 From: Marinho Date: Sat, 23 Sep 2023 17:41:58 +0000 Subject: [PATCH] new: feat: add parallelism --- api/comando_torrents.go | 78 +++++++++++++++++++++++------------------ scrape/info.go | 69 +++++++++++++++++++++++------------- 2 files changed, 89 insertions(+), 58 deletions(-) diff --git a/api/comando_torrents.go b/api/comando_torrents.go index 668afc0..bc66a48 100644 --- a/api/comando_torrents.go +++ b/api/comando_torrents.go @@ -200,47 +200,57 @@ func getTorrents(ctx context.Context, i *Indexer, link string) ([]IndexedTorrent } }) + var chanIndexedTorrent = make(chan IndexedTorrent) + // for each magnet link, create a new indexed torrent for _, magnetLink := range magnetLinks { - releaseTitle := extractReleaseName(magnetLink) - magnetAudio := []schema.Audio{} - if strings.Contains(strings.ToLower(releaseTitle), "dual") { - magnetAudio = append(magnetAudio, audio...) - } else if len(audio) > 1 { - // remove portuguese audio, and append to magnetAudio - for _, a := range audio { - if a != schema.AudioPortuguese { - magnetAudio = append(magnetAudio, a) + go func(magnetLink string) { + releaseTitle := extractReleaseName(magnetLink) + magnetAudio := []schema.Audio{} + if strings.Contains(strings.ToLower(releaseTitle), "dual") { + magnetAudio = append(magnetAudio, audio...) + } else if len(audio) > 1 { + // remove portuguese audio, and append to magnetAudio + for _, a := range audio { + if a != schema.AudioPortuguese { + magnetAudio = append(magnetAudio, a) + } } + } else { + magnetAudio = append(magnetAudio, audio...) } - } else { - magnetAudio = append(magnetAudio, audio...) - } - // decode url encoded title - releaseTitle, _ = url.QueryUnescape(releaseTitle) + // decode url encoded title + releaseTitle, _ = url.QueryUnescape(releaseTitle) - infoHash := extractInfoHash(magnetLink) - trackers := extractTrackers(magnetLink) - peer, seed, err := goscrape.GetLeechsAndSeeds(ctx, i.redis, infoHash, trackers) - if err != nil { - fmt.Println(err) - } + infoHash := extractInfoHash(magnetLink) + trackers := extractTrackers(magnetLink) + peer, seed, err := goscrape.GetLeechsAndSeeds(ctx, i.redis, infoHash, trackers) + if err != nil { + fmt.Println(err) + } - title := processTitle(title, magnetAudio) + title := processTitle(title, magnetAudio) - indexedTorrents = append(indexedTorrents, IndexedTorrent{ - Title: releaseTitle, - OriginalTitle: title, - Details: link, - Year: year, - Audio: magnetAudio, - MagnetLink: magnetLink, - Date: date, - InfoHash: infoHash, - Trackers: trackers, - LeechCount: peer, - SeedCount: seed, - }) + it := IndexedTorrent{ + Title: releaseTitle, + OriginalTitle: title, + Details: link, + Year: year, + Audio: magnetAudio, + MagnetLink: magnetLink, + Date: date, + InfoHash: infoHash, + Trackers: trackers, + LeechCount: peer, + SeedCount: seed, + } + chanIndexedTorrent <- it + }(magnetLink) + } + + for i := 0; i < len(magnetLinks); i++ { + it := <-chanIndexedTorrent + indexedTorrents = append(indexedTorrents, it) } return indexedTorrents, nil diff --git a/scrape/info.go b/scrape/info.go index 48f65cd..87f2092 100644 --- a/scrape/info.go +++ b/scrape/info.go @@ -9,24 +9,29 @@ import ( "github.com/felipemarinho97/torrent-indexer/cache" ) +type peers struct { + Seeders int `json:"seed"` + Leechers int `json:"leech"` +} + func getPeersFromCache(ctx context.Context, r *cache.Redis, infoHash string) (int, int, error) { // get peers and seeds from redis first peersCache, err := r.Get(ctx, infoHash) if err == nil { - var peers map[string]int + var peers peers err = json.Unmarshal(peersCache, &peers) if err != nil { return 0, 0, err } - return peers["leech"], peers["seed"], nil + return peers.Leechers, peers.Seeders, nil } return 0, 0, err } func setPeersToCache(ctx context.Context, r *cache.Redis, infoHash string, peer, seed int) error { - peers := map[string]int{ - "leech": peer, - "seed": seed, + peers := peers{ + Seeders: seed, + Leechers: peer, } peersJSON, err := json.Marshal(peers) if err != nil { @@ -40,7 +45,6 @@ func setPeersToCache(ctx context.Context, r *cache.Redis, infoHash string, peer, } func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, infoHash string, trackers []string) (int, int, error) { - var leech, seed int leech, seed, err := getPeersFromCache(ctx, r, infoHash) if err != nil { fmt.Println("unable to get peers from cache for infohash:", infoHash) @@ -49,27 +53,44 @@ func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, infoHash string, tra return leech, seed, nil } + var peerChan = make(chan peers) + var errChan = make(chan error) + for _, tracker := range trackers { - // get peers and seeds from redis first - scraper, err := New(tracker) - if err != nil { - fmt.Println(err) - continue - } + go func(tracker string) { + // get peers and seeds from redis first + scraper, err := New(tracker) + if err != nil { + errChan <- err + return + } - scraper.SetTimeout(500 * time.Millisecond) + scraper.SetTimeout(500 * time.Millisecond) - // get peers and seeds from redis first - res, err := scraper.Scrape([]byte(infoHash)) - if err != nil { - fmt.Println(err) - continue - } + // get peers and seeds from redis first + res, err := scraper.Scrape([]byte(infoHash)) + if err != nil { + errChan <- err + return + } - leech += int(res[0].Leechers) - seed += int(res[0].Seeders) - setPeersToCache(ctx, r, infoHash, leech, seed) - return leech, seed, nil + peerChan <- peers{ + Seeders: int(res[0].Seeders), + Leechers: int(res[0].Leechers), + } + }(tracker) } - return leech, seed, nil + + var peer peers + for i := 0; i < len(trackers); i++ { + select { + case peer = <-peerChan: + setPeersToCache(ctx, r, infoHash, peer.Leechers, peer.Seeders) + return peer.Leechers, peer.Seeders, nil + case err := <-errChan: + fmt.Println(err) + } + } + + return 0, 0, fmt.Errorf("unable to get peers from trackers") }