Feat/Flaresolverr support (#12)

* new: feat: add flaresolverr support

* chg: feat: add session pool

* chg: fix: deadlock error

* chg: fix: make it work without flaresolverr
This commit is contained in:
2024-09-24 18:31:58 -03:00
committed by GitHub
parent ced533cd40
commit a6a848b284
9 changed files with 357 additions and 21 deletions

View File

@@ -44,7 +44,7 @@ func (i *Indexer) HandlerBluDVIndexer(w http.ResponseWriter, r *http.Request) {
}
fmt.Println("URL:>", url)
resp, err := http.Get(url)
resp, err := i.requester.GetDocument(ctx, url)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
@@ -54,9 +54,9 @@ func (i *Indexer) HandlerBluDVIndexer(w http.ResponseWriter, r *http.Request) {
i.metrics.IndexerErrors.WithLabelValues("bludv").Inc()
return
}
defer resp.Body.Close()
defer resp.Close()
doc, err := goquery.NewDocumentFromReader(resp.Body)
doc, err := goquery.NewDocumentFromReader(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})

View File

@@ -60,7 +60,7 @@ func (i *Indexer) HandlerComandoIndexer(w http.ResponseWriter, r *http.Request)
}
fmt.Println("URL:>", url)
resp, err := http.Get(url)
resp, err := i.requester.GetDocument(ctx, url)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
@@ -70,9 +70,9 @@ func (i *Indexer) HandlerComandoIndexer(w http.ResponseWriter, r *http.Request)
i.metrics.IndexerErrors.WithLabelValues("comando").Inc()
return
}
defer resp.Body.Close()
defer resp.Close()
doc, err := goquery.NewDocumentFromReader(resp.Body)
doc, err := goquery.NewDocumentFromReader(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
@@ -405,17 +405,18 @@ func getDocument(ctx context.Context, i *Indexer, link string) (*goquery.Documen
docCache, err := i.redis.Get(ctx, link)
if err == nil {
i.metrics.CacheHits.WithLabelValues("document_body").Inc()
fmt.Printf("returning from long-lived cache: %s\n", link)
return goquery.NewDocumentFromReader(io.NopCloser(bytes.NewReader(docCache)))
}
defer i.metrics.CacheMisses.WithLabelValues("document_body").Inc()
resp, err := http.Get(link)
resp, err := i.requester.GetDocument(ctx, link)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer resp.Close()
body, err := io.ReadAll(resp.Body)
body, err := io.ReadAll(resp)
if err != nil {
return nil, err
}

View File

@@ -7,12 +7,14 @@ import (
"github.com/felipemarinho97/torrent-indexer/cache"
"github.com/felipemarinho97/torrent-indexer/monitoring"
"github.com/felipemarinho97/torrent-indexer/requester"
"github.com/felipemarinho97/torrent-indexer/schema"
)
type Indexer struct {
redis *cache.Redis
metrics *monitoring.Metrics
redis *cache.Redis
metrics *monitoring.Metrics
requester *requester.Requster
}
type IndexerMeta struct {
@@ -42,10 +44,11 @@ type IndexedTorrent struct {
Similarity float32 `json:"similarity"`
}
func NewIndexers(redis *cache.Redis, metrics *monitoring.Metrics) *Indexer {
func NewIndexers(redis *cache.Redis, metrics *monitoring.Metrics, req *requester.Requster) *Indexer {
return &Indexer{
redis: redis,
metrics: metrics,
redis: redis,
metrics: metrics,
requester: req,
}
}

2
cache/redis.go vendored
View File

@@ -10,7 +10,7 @@ import (
)
var (
DefaultExpiration = 24 * time.Hour * 180 // 180 days
DefaultExpiration = 24 * time.Hour * 7 // 7 days
IndexerComandoTorrents = "indexer:comando_torrents"
)

View File

@@ -11,6 +11,7 @@ services:
- indexer
environment:
- REDIS_HOST=redis
- FLARESOLVERR_ADDRESS=http://flaresolverr:8191
redis:
image: redis:alpine

10
main.go
View File

@@ -1,11 +1,14 @@
package main
import (
"fmt"
"net/http"
"os"
handler "github.com/felipemarinho97/torrent-indexer/api"
"github.com/felipemarinho97/torrent-indexer/cache"
"github.com/felipemarinho97/torrent-indexer/monitoring"
"github.com/felipemarinho97/torrent-indexer/requester"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
@@ -13,7 +16,10 @@ func main() {
redis := cache.NewRedis()
metrics := monitoring.NewMetrics()
metrics.Register()
indexers := handler.NewIndexers(redis, metrics)
flaresolverr := requester.NewFlareSolverr(os.Getenv("FLARESOLVERR_ADDRESS"), 60000)
req := requester.NewRequester(flaresolverr, redis)
indexers := handler.NewIndexers(redis, metrics, req)
indexerMux := http.NewServeMux()
metricsMux := http.NewServeMux()
@@ -31,7 +37,7 @@ func main() {
panic(err)
}
}()
fmt.Println("Server listening on :7006")
err := http.ListenAndServe(":7006", indexerMux)
if err != nil {
panic(err)

230
requester/flaresolverr.go Normal file
View File

@@ -0,0 +1,230 @@
package requester
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
)
type FlareSolverr struct {
url string
maxTimeout int
httpClient *http.Client
sessionPool chan string
mu sync.Mutex
initiated bool
}
func NewFlareSolverr(url string, timeoutMilli int) *FlareSolverr {
poolSize := 5
httpClient := &http.Client{}
sessionPool := make(chan string, poolSize) // Pool size of 5 sessions
f := &FlareSolverr{
url: url,
maxTimeout: timeoutMilli,
httpClient: httpClient,
sessionPool: sessionPool,
}
err := f.FillSessionPool()
if err == nil {
f.initiated = true
}
return f
}
func (f *FlareSolverr) FillSessionPool() error {
// Check if the pool is already filled
if len(f.sessionPool) == cap(f.sessionPool) {
return nil
}
// Pre-initialize the pool with existing sessions
sessions, err := f.ListSessions()
if err != nil {
fmt.Println("Failed to list existing FlareSolverr sessions:", err)
return err
} else {
for _, session := range sessions {
// Add available sessions to the pool
if len(f.sessionPool) < cap(f.sessionPool) {
f.sessionPool <- session
}
}
if len(f.sessionPool) > 0 {
fmt.Printf("Added %d FlareSolverr sessions to the pool\n", len(f.sessionPool))
}
}
// If fewer than poolSize sessions were found, create new ones to fill the pool
for len(f.sessionPool) < cap(f.sessionPool) {
f.CreateSession()
}
return nil
}
func (f *FlareSolverr) CreateSession() string {
f.mu.Lock()
defer f.mu.Unlock()
body := map[string]string{"cmd": "sessions.create"}
jsonBody, err := json.Marshal(body)
if err != nil {
return ""
}
req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody))
if err != nil {
return ""
}
req.Header.Set("Content-Type", "application/json")
resp, err := f.httpClient.Do(req)
if err != nil {
return ""
}
defer resp.Body.Close()
var sessionResponse map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&sessionResponse)
if err != nil {
return ""
}
session := sessionResponse["session"].(string)
// Add session to the pool
f.sessionPool <- session
fmt.Println("Created new FlareSolverr session:", session)
return session
}
func (f *FlareSolverr) ListSessions() ([]string, error) {
body := map[string]string{"cmd": "sessions.list"}
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
resp, err := f.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var sessionsResponse map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&sessionsResponse)
if err != nil {
return nil, err
}
sessions := sessionsResponse["sessions"].([]interface{})
var sessionIDs []string
for _, session := range sessions {
sessionIDs = append(sessionIDs, session.(string))
}
return sessionIDs, nil
}
func (f *FlareSolverr) RetrieveSession() string {
// Blocking receive from the session pool.
session := <-f.sessionPool
return session
}
type Response struct {
Status string `json:"status"`
Message string `json:"message"`
Solution struct {
Url string `json:"url"`
Status int `json:"status"`
Cookies []struct {
Domain string `json:"domain"`
Expiry int `json:"expiry"`
HttpOnly bool `json:"httpOnly"`
Name string `json:"name"`
Path string `json:"path"`
SameSite string `json:"sameSite"`
Secure bool `json:"secure"`
Value string `json:"value"`
} `json:"cookies"`
UserAgent string `json:"userAgent"`
Headers map[string]string `json:"headers"`
Response string `json:"response"`
} `json:"solution"`
}
func (f *FlareSolverr) Get(url string) (io.ReadCloser, error) {
// Check if the FlareSolverr instance was initiated
if !f.initiated {
return io.NopCloser(bytes.NewReader([]byte(""))), nil
}
// Retrieve session from the pool (blocking if no sessions available)
session := f.RetrieveSession()
// Ensure the session is returned to the pool after the request is done
defer func() {
f.sessionPool <- session
}()
body := map[string]string{
"cmd": "request.get",
"url": url,
"maxTimeout": fmt.Sprintf("%d", f.maxTimeout),
"session": session,
}
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
resp, err := f.httpClient.Do(req)
if err != nil {
return nil, err
}
// Parse the response
var response Response
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return nil, err
}
// Check if the response was successful
if response.Status != "ok" {
return nil, fmt.Errorf("failed to get response: %s", response.Message)
}
// Check if "Under attack" is in the response
if strings.Contains(response.Solution.Response, "Under attack") {
return nil, fmt.Errorf("under attack")
}
// Return the response body
return io.NopCloser(bytes.NewReader([]byte(response.Solution.Response))), nil
}

93
requester/requester.go Normal file
View File

@@ -0,0 +1,93 @@
package requester
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"regexp"
"time"
"github.com/felipemarinho97/torrent-indexer/cache"
)
const (
shortLivedCacheExpiration = 30 * time.Minute
cacheKey = "shortLivedCache"
)
var challangeRegex = regexp.MustCompile(`(?i)(just a moment|cf-chl-bypass|under attack)`)
type Requster struct {
fs *FlareSolverr
c *cache.Redis
httpClient *http.Client
}
func NewRequester(fs *FlareSolverr, c *cache.Redis) *Requster {
return &Requster{fs: fs, httpClient: &http.Client{}, c: c}
}
func (i *Requster) GetDocument(ctx context.Context, url string) (io.ReadCloser, error) {
var body io.ReadCloser
// try request from short-lived cache
key := fmt.Sprintf("%s:%s", cacheKey, url)
bodyByte, err := i.c.Get(ctx, key)
if err == nil {
fmt.Printf("returning from short-lived cache: %s\n", url)
body = io.NopCloser(bytes.NewReader(bodyByte))
return body, nil
}
// try request with plain client
resp, err := i.httpClient.Get(url)
if err != nil {
// try request with flare solverr
body, err = i.fs.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to do request for url %s: %w", url, err)
}
} else {
defer resp.Body.Close()
body = resp.Body
}
bodyByte, err = io.ReadAll(body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if hasChallange(bodyByte) {
// try request with flare solverr
body, err = i.fs.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to do request for url %s: %w", url, err)
}
bodyByte, err = io.ReadAll(body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
fmt.Printf("request served from flaresolverr: %s\n", url)
} else {
fmt.Printf("request served from plain client: %s\n", url)
}
// save response to cache if it's not a challange and body is not empty
if !hasChallange(bodyByte) && len(bodyByte) > 0 {
err = i.c.SetWithExpiration(ctx, key, bodyByte, shortLivedCacheExpiration)
if err != nil {
fmt.Printf("failed to save response to cache: %v\n", err)
}
fmt.Printf("saved to cache: %s\n", url)
} else {
return nil, fmt.Errorf("response is a challange")
}
return io.NopCloser(bytes.NewReader(bodyByte)), nil
}
// hasChallange checks if the body contains a challange by regex matching
func hasChallange(body []byte) bool {
return challangeRegex.Match(body)
}

View File

@@ -52,7 +52,7 @@ func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, m *monitoring.Metric
fmt.Println("unable to get peers from cache for infohash:", infoHash)
} else {
m.CacheMisses.WithLabelValues("peers").Inc()
fmt.Println("get from cache> leech:", leech, "seed:", seed)
fmt.Println("hash:", infoHash, "get from cache -> leech:", leech, "seed:", seed)
return leech, seed, nil
}
@@ -87,16 +87,18 @@ func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, m *monitoring.Metric
var peer peers
for i := 0; i < len(trackers); i++ {
select {
case <-errChan:
// discard error
case peer = <-peerChan:
err = setPeersToCache(ctx, r, infoHash, peer.Leechers, peer.Seeders)
if err != nil {
fmt.Println(err)
} else {
fmt.Println("hash:", infoHash, "get from tracker -> leech:", peer.Leechers, "seed:", peer.Seeders)
}
return peer.Leechers, peer.Seeders, nil
case err := <-errChan:
fmt.Println(err)
}
}
return 0, 0, fmt.Errorf("unable to get peers from trackers")
return 0, 0, fmt.Errorf("unable to get peers from trackers for infohash: %s", infoHash)
}