new: feat: add parallelism

This commit is contained in:
2023-09-23 17:41:58 +00:00
parent faa721dc19
commit 8ac5ed3c2c
2 changed files with 89 additions and 58 deletions

View File

@@ -200,8 +200,11 @@ func getTorrents(ctx context.Context, i *Indexer, link string) ([]IndexedTorrent
} }
}) })
var chanIndexedTorrent = make(chan IndexedTorrent)
// for each magnet link, create a new indexed torrent // for each magnet link, create a new indexed torrent
for _, magnetLink := range magnetLinks { for _, magnetLink := range magnetLinks {
go func(magnetLink string) {
releaseTitle := extractReleaseName(magnetLink) releaseTitle := extractReleaseName(magnetLink)
magnetAudio := []schema.Audio{} magnetAudio := []schema.Audio{}
if strings.Contains(strings.ToLower(releaseTitle), "dual") { if strings.Contains(strings.ToLower(releaseTitle), "dual") {
@@ -228,7 +231,7 @@ func getTorrents(ctx context.Context, i *Indexer, link string) ([]IndexedTorrent
title := processTitle(title, magnetAudio) title := processTitle(title, magnetAudio)
indexedTorrents = append(indexedTorrents, IndexedTorrent{ it := IndexedTorrent{
Title: releaseTitle, Title: releaseTitle,
OriginalTitle: title, OriginalTitle: title,
Details: link, Details: link,
@@ -240,7 +243,14 @@ func getTorrents(ctx context.Context, i *Indexer, link string) ([]IndexedTorrent
Trackers: trackers, Trackers: trackers,
LeechCount: peer, LeechCount: peer,
SeedCount: seed, SeedCount: seed,
}) }
chanIndexedTorrent <- it
}(magnetLink)
}
for i := 0; i < len(magnetLinks); i++ {
it := <-chanIndexedTorrent
indexedTorrents = append(indexedTorrents, it)
} }
return indexedTorrents, nil return indexedTorrents, nil

View File

@@ -9,24 +9,29 @@ import (
"github.com/felipemarinho97/torrent-indexer/cache" "github.com/felipemarinho97/torrent-indexer/cache"
) )
type peers struct {
Seeders int `json:"seed"`
Leechers int `json:"leech"`
}
func getPeersFromCache(ctx context.Context, r *cache.Redis, infoHash string) (int, int, error) { func getPeersFromCache(ctx context.Context, r *cache.Redis, infoHash string) (int, int, error) {
// get peers and seeds from redis first // get peers and seeds from redis first
peersCache, err := r.Get(ctx, infoHash) peersCache, err := r.Get(ctx, infoHash)
if err == nil { if err == nil {
var peers map[string]int var peers peers
err = json.Unmarshal(peersCache, &peers) err = json.Unmarshal(peersCache, &peers)
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
return peers["leech"], peers["seed"], nil return peers.Leechers, peers.Seeders, nil
} }
return 0, 0, err return 0, 0, err
} }
func setPeersToCache(ctx context.Context, r *cache.Redis, infoHash string, peer, seed int) error { func setPeersToCache(ctx context.Context, r *cache.Redis, infoHash string, peer, seed int) error {
peers := map[string]int{ peers := peers{
"leech": peer, Seeders: seed,
"seed": seed, Leechers: peer,
} }
peersJSON, err := json.Marshal(peers) peersJSON, err := json.Marshal(peers)
if err != nil { if err != nil {
@@ -40,7 +45,6 @@ func setPeersToCache(ctx context.Context, r *cache.Redis, infoHash string, peer,
} }
func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, infoHash string, trackers []string) (int, int, error) { func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, infoHash string, trackers []string) (int, int, error) {
var leech, seed int
leech, seed, err := getPeersFromCache(ctx, r, infoHash) leech, seed, err := getPeersFromCache(ctx, r, infoHash)
if err != nil { if err != nil {
fmt.Println("unable to get peers from cache for infohash:", infoHash) fmt.Println("unable to get peers from cache for infohash:", infoHash)
@@ -49,12 +53,16 @@ func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, infoHash string, tra
return leech, seed, nil return leech, seed, nil
} }
var peerChan = make(chan peers)
var errChan = make(chan error)
for _, tracker := range trackers { for _, tracker := range trackers {
go func(tracker string) {
// get peers and seeds from redis first // get peers and seeds from redis first
scraper, err := New(tracker) scraper, err := New(tracker)
if err != nil { if err != nil {
fmt.Println(err) errChan <- err
continue return
} }
scraper.SetTimeout(500 * time.Millisecond) scraper.SetTimeout(500 * time.Millisecond)
@@ -62,14 +70,27 @@ func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, infoHash string, tra
// get peers and seeds from redis first // get peers and seeds from redis first
res, err := scraper.Scrape([]byte(infoHash)) res, err := scraper.Scrape([]byte(infoHash))
if err != nil { if err != nil {
fmt.Println(err) errChan <- err
continue return
} }
leech += int(res[0].Leechers) peerChan <- peers{
seed += int(res[0].Seeders) Seeders: int(res[0].Seeders),
setPeersToCache(ctx, r, infoHash, leech, seed) Leechers: int(res[0].Leechers),
return leech, seed, nil
} }
return leech, seed, nil }(tracker)
}
var peer peers
for i := 0; i < len(trackers); i++ {
select {
case peer = <-peerChan:
setPeersToCache(ctx, r, infoHash, peer.Leechers, peer.Seeders)
return peer.Leechers, peer.Seeders, nil
case err := <-errChan:
fmt.Println(err)
}
}
return 0, 0, fmt.Errorf("unable to get peers from trackers")
} }