Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

""" 

This is the Scrapy engine which controls the Scheduler, Downloader and Spiders. 

 

For more information see docs/topics/architecture.rst 

 

""" 

import warnings 

from time import time 

 

from twisted.internet import defer 

from twisted.python.failure import Failure 

 

from scrapy import log, signals 

from scrapy.stats import stats 

from scrapy.core.downloader import Downloader 

from scrapy.core.scraper import Scraper 

from scrapy.exceptions import DontCloseSpider, ScrapyDeprecationWarning 

from scrapy.http import Response, Request 

from scrapy.utils.misc import load_object 

from scrapy.utils.signal import send_catch_log, send_catch_log_deferred 

from scrapy.utils.reactor import CallLaterOnce 

 

 

class Slot(object): 

 

    def __init__(self, start_requests, close_if_idle, nextcall, scheduler): 

        self.closing = False 

        self.inprogress = set() # requests in progress 

        self.start_requests = iter(start_requests) 

        self.close_if_idle = close_if_idle 

        self.nextcall = nextcall 

        self.scheduler = scheduler 

 

    def add_request(self, request): 

        self.inprogress.add(request) 

 

    def remove_request(self, request): 

        self.inprogress.remove(request) 

        self._maybe_fire_closing() 

 

    def close(self): 

        self.closing = defer.Deferred() 

        self._maybe_fire_closing() 

        return self.closing 

 

    def _maybe_fire_closing(self): 

        if self.closing and not self.inprogress: 

50            if self.nextcall: 

                self.nextcall.cancel() 

            self.closing.callback(None) 

 

 

class ExecutionEngine(object): 

 

    def __init__(self, crawler, spider_closed_callback): 

        self.settings = crawler.settings 

        self.slots = {} 

        self.running = False 

        self.paused = False 

        self.scheduler_cls = load_object(self.settings['SCHEDULER']) 

        self.downloader = Downloader(crawler) 

        self.scraper = Scraper(crawler) 

        self._concurrent_spiders = self.settings.getint('CONCURRENT_SPIDERS', 1) 

65        if self._concurrent_spiders != 1: 

            warnings.warn("CONCURRENT_SPIDERS settings is deprecated, use " \ 

                "Scrapyd max_proc config instead", ScrapyDeprecationWarning) 

        self._spider_closed_callback = spider_closed_callback 

 

    @defer.inlineCallbacks 

    def start(self): 

        """Start the execution engine""" 

        assert not self.running, "Engine already running" 

        self.start_time = time() 

        yield send_catch_log_deferred(signal=signals.engine_started) 

        self.running = True 

 

    def stop(self): 

        """Stop the execution engine gracefully""" 

        assert self.running, "Engine not running" 

        self.running = False 

        dfd = self._close_all_spiders() 

        return dfd.addBoth(lambda _: self._finish_stopping_engine()) 

 

    def pause(self): 

        """Pause the execution engine""" 

        self.paused = True 

 

    def unpause(self): 

        """Resume the execution engine""" 

        self.paused = False 

 

    def _next_request(self, spider): 

        try: 

            slot = self.slots[spider] 

        except KeyError: 

            return 

 

99        if self.paused: 

            slot.nextcall.schedule(5) 

            return 

 

106        while not self._needs_backout(spider): 

            if not self._next_request_from_scheduler(spider): 

                break 

 

        if slot.start_requests and not self._needs_backout(spider): 

            try: 

                request = slot.start_requests.next() 

            except StopIteration: 

                slot.start_requests = None 

            except Exception, exc: 

                log.err(None, 'Obtaining request from start requests', \ 

                        spider=spider) 

            else: 

                self.crawl(request, spider) 

 

        if self.spider_is_idle(spider) and slot.close_if_idle: 

            self._spider_idle(spider) 

 

    def _needs_backout(self, spider): 

        slot = self.slots[spider] 

        return not self.running \ 

            or slot.closing \ 

            or self.downloader.needs_backout() \ 

            or self.scraper.slots[spider].needs_backout() 

 

    def _next_request_from_scheduler(self, spider): 

        slot = self.slots[spider] 

        request = slot.scheduler.next_request() 

        if not request: 

            return 

        d = self._download(request, spider) 

        d.addBoth(self._handle_downloader_output, request, spider) 

        d.addErrback(log.msg, spider=spider) 

        d.addBoth(lambda _: slot.remove_request(request)) 

        d.addErrback(log.msg, spider=spider) 

        d.addBoth(lambda _: slot.nextcall.schedule()) 

        d.addErrback(log.msg, spider=spider) 

        return d 

 

    def _handle_downloader_output(self, response, request, spider): 

        assert isinstance(response, (Request, Response, Failure)), response 

        # downloader middleware can return requests (for example, redirects) 

        if isinstance(response, Request): 

            self.crawl(response, spider) 

            return 

        # response is a Response or Failure 

        d = self.scraper.enqueue_scrape(response, request, spider) 

        d.addErrback(log.err, spider=spider) 

        return d 

 

    def spider_is_idle(self, spider): 

        scraper_idle = spider in self.scraper.slots \ 

            and self.scraper.slots[spider].is_idle() 

        pending = self.slots[spider].scheduler.has_pending_requests() 

        downloading = bool(self.downloader.slots) 

        idle = scraper_idle and not (pending or downloading) 

        return idle 

 

    @property 

    def open_spiders(self): 

        return self.slots.keys() 

 

    def has_capacity(self): 

        """Does the engine have capacity to handle more spiders""" 

        return len(self.slots) < self._concurrent_spiders 

 

    def crawl(self, request, spider): 

        assert spider in self.open_spiders, \ 

            "Spider %r not opened when crawling: %s" % (spider.name, request) 

        self.schedule(request, spider) 

        self.slots[spider].nextcall.schedule() 

 

    def schedule(self, request, spider): 

        return self.slots[spider].scheduler.enqueue_request(request) 

 

    def download(self, request, spider): 

        slot = self.slots[spider] 

        slot.add_request(request) 

        d = self._download(request, spider) 

        d.addBoth(self._downloaded, slot, request, spider) 

        return d 

 

    def _downloaded(self, response, slot, request, spider): 

        slot.remove_request(request) 

        return self.download(response, spider) \ 

                if isinstance(response, Request) else response 

 

    def _download(self, request, spider): 

        slot = self.slots[spider] 

        slot.add_request(request) 

        def _on_success(response): 

            assert isinstance(response, (Response, Request)) 

            if isinstance(response, Response): 

                response.request = request # tie request to response received 

                log.msg(log.formatter.crawled(request, response, spider), \ 

                    level=log.DEBUG, spider=spider) 

                send_catch_log(signal=signals.response_received, \ 

                    response=response, request=request, spider=spider) 

            return response 

 

        def _on_error(failure): 

            failure.request = request 

            return failure 

 

        def _on_complete(_): 

            slot.nextcall.schedule() 

            return _ 

 

        dwld = self.downloader.fetch(request, spider) 

        dwld.addCallbacks(_on_success, _on_error) 

        dwld.addBoth(_on_complete) 

        return dwld 

 

    @defer.inlineCallbacks 

    def open_spider(self, spider, start_requests=None, close_if_idle=True): 

        assert self.has_capacity(), "No free spider slots when opening %r" % \ 

            spider.name 

        log.msg("Spider opened", spider=spider) 

        nextcall = CallLaterOnce(self._next_request, spider) 

        scheduler = self.scheduler_cls.from_settings(self.settings) 

        slot = Slot(start_requests or (), close_if_idle, nextcall, scheduler) 

        self.slots[spider] = slot 

        yield scheduler.open(spider) 

        yield self.scraper.open_spider(spider) 

        stats.open_spider(spider) 

        yield send_catch_log_deferred(signals.spider_opened, spider=spider) 

        slot.nextcall.schedule() 

 

    def _spider_idle(self, spider): 

        """Called when a spider gets idle. This function is called when there 

        are no remaining pages to download or schedule. It can be called 

        multiple times. If some extension raises a DontCloseSpider exception 

        (in the spider_idle signal handler) the spider is not closed until the 

        next loop and this function is guaranteed to be called (at least) once 

        again for this spider. 

        """ 

        res = send_catch_log(signal=signals.spider_idle, \ 

            spider=spider, dont_log=DontCloseSpider) 

242        if any(isinstance(x, Failure) and isinstance(x.value, DontCloseSpider) \ 

                for _, x in res): 

            self.slots[spider].nextcall.schedule(5) 

            return 

 

exit        if self.spider_is_idle(spider): 

            self.close_spider(spider, reason='finished') 

 

    def close_spider(self, spider, reason='cancelled'): 

        """Close (cancel) spider and clear all its outstanding requests""" 

 

        slot = self.slots[spider] 

253        if slot.closing: 

            return slot.closing 

        log.msg("Closing spider (%s)" % reason, spider=spider) 

 

        dfd = slot.close() 

 

        dfd.addBoth(lambda _: self.scraper.close_spider(spider)) 

        dfd.addErrback(log.err, spider=spider) 

 

        dfd.addBoth(lambda _: slot.scheduler.close(reason)) 

        dfd.addErrback(log.err, spider=spider) 

 

        dfd.addBoth(lambda _: send_catch_log_deferred(signal=signals.spider_closed, \ 

            spider=spider, reason=reason)) 

        dfd.addErrback(log.err, spider=spider) 

 

        dfd.addBoth(lambda _: stats.close_spider(spider, reason=reason)) 

        dfd.addErrback(log.err, spider=spider) 

 

        dfd.addBoth(lambda _: log.msg("Spider closed (%s)" % reason, spider=spider)) 

 

        dfd.addBoth(lambda _: self.slots.pop(spider)) 

        dfd.addErrback(log.err, spider=spider) 

 

        dfd.addBoth(lambda _: self._spider_closed_callback(spider)) 

 

        return dfd 

 

    def _close_all_spiders(self): 

        dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders] 

        dlist = defer.DeferredList(dfds) 

        return dlist 

 

    @defer.inlineCallbacks 

    def _finish_stopping_engine(self): 

        yield send_catch_log_deferred(signal=signals.engine_stopped) 

        yield stats.engine_stopped()