Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

from collections import defaultdict 

from twisted.internet.defer import Deferred, DeferredList 

from twisted.python.failure import Failure 

 

from scrapy.utils.defer import mustbe_deferred, defer_result 

from scrapy import log 

from scrapy.utils.request import request_fingerprint 

from scrapy.utils.misc import arg_to_iter 

 

class MediaPipeline(object): 

 

    LOG_FAILED_RESULTS = True 

 

    class SpiderInfo(object): 

        def __init__(self, spider): 

            self.spider = spider 

            self.downloading = set() 

            self.downloaded = {} 

            self.waiting = defaultdict(list) 

 

    def __init__(self, download_func=None): 

        self.spiderinfo = {} 

        self.download_func = download_func 

 

    @classmethod 

    def from_crawler(cls, crawler): 

        try: 

            pipe = cls.from_settings(crawler.settings) 

        except AttributeError: 

            pipe = cls() 

        pipe.crawler = crawler 

        return pipe 

 

    def open_spider(self, spider): 

        self.spiderinfo[spider] = self.SpiderInfo(spider) 

 

    def close_spider(self, spider): 

        del self.spiderinfo[spider] 

 

    def process_item(self, item, spider): 

        info = self.spiderinfo[spider] 

        requests = arg_to_iter(self.get_media_requests(item, info)) 

        dlist = [self._process_request(r, info) for r in requests] 

        dfd = DeferredList(dlist, consumeErrors=1) 

        return dfd.addCallback(self.item_completed, item, info) 

 

    def _process_request(self, request, info): 

        fp = request_fingerprint(request) 

        cb = request.callback or (lambda _: _) 

        eb = request.errback 

        request.callback = None 

        request.errback = None 

 

        # Return cached result if request was already seen 

        if fp in info.downloaded: 

            return defer_result(info.downloaded[fp]).addCallbacks(cb, eb) 

 

        # Otherwise, wait for result 

        wad = Deferred().addCallbacks(cb, eb) 

        info.waiting[fp].append(wad) 

 

        # Check if request is downloading right now to avoid doing it twice 

        if fp in info.downloading: 

            return wad 

 

        # Download request checking media_to_download hook output first 

        info.downloading.add(fp) 

        dfd = mustbe_deferred(self.media_to_download, request, info) 

        dfd.addCallback(self._check_media_to_download, request, info) 

        dfd.addBoth(self._cache_result_and_execute_waiters, fp, info) 

        dfd.addErrback(log.err, spider=info.spider) 

        return dfd.addBoth(lambda _: wad) # it must return wad at last 

 

    def _check_media_to_download(self, result, request, info): 

        if result is not None: 

            return result 

84        if self.download_func: 

            # this ugly code was left only to support tests. TODO: remove 

            dfd = mustbe_deferred(self.download_func, request, info.spider) 

            dfd.addCallbacks( 

                callback=self.media_downloaded, callbackArgs=(request, info), 

                errback=self.media_failed, errbackArgs=(request, info)) 

        else: 

            request.meta['handle_httpstatus_all'] = True 

            dfd = self.crawler.engine.download(request, info.spider) 

            dfd.addCallbacks( 

                callback=self.media_downloaded, callbackArgs=(request, info), 

                errback=self.media_failed, errbackArgs=(request, info)) 

        return dfd 

 

    def _cache_result_and_execute_waiters(self, result, fp, info): 

        if isinstance(result, Failure): 

            # minimize cached information for failure 

            result.cleanFailure() 

            result.frames = [] 

            result.stack = None 

        info.downloading.remove(fp) 

        info.downloaded[fp] = result # cache result 

        for wad in info.waiting.pop(fp): 

            defer_result(result).chainDeferred(wad) 

 

    ### Overradiable Interface 

    def media_to_download(self, request, info): 

        """Check request before starting download""" 

        pass 

 

    def get_media_requests(self, item, info): 

        """Returns the media requests to download""" 

        pass 

 

    def media_downloaded(self, response, request, info): 

        """Handler for success downloads""" 

        return response 

 

    def media_failed(self, failure, request, info): 

        """Handler for failed downloads""" 

        return failure 

 

    def item_completed(self, results, item, info): 

        """Called per item when all media requests has been processed""" 

        if self.LOG_FAILED_RESULTS: 

            msg = '%s found errors proessing %s' % (self.__class__.__name__, item) 

            for ok, value in results: 

                if not ok: 

                    log.err(value, msg, spider=info.spider) 

        return item