Spaces:
Runtime error
Runtime error
| """ robotparser.py | |
| Copyright (C) 2000 Bastian Kleineidam | |
| You can choose between two licenses when using this package: | |
| 1) GNU GPLv2 | |
| 2) PSF license for Python 2.2 | |
| The robots.txt Exclusion Protocol is implemented as specified in | |
| http://www.robotstxt.org/norobots-rfc.txt | |
| """ | |
| import collections | |
| import urllib.parse | |
| import urllib.request | |
| __all__ = ["RobotFileParser"] | |
| RequestRate = collections.namedtuple("RequestRate", "requests seconds") | |
| class RobotFileParser: | |
| """ This class provides a set of methods to read, parse and answer | |
| questions about a single robots.txt file. | |
| """ | |
| def __init__(self, url=''): | |
| self.entries = [] | |
| self.sitemaps = [] | |
| self.default_entry = None | |
| self.disallow_all = False | |
| self.allow_all = False | |
| self.set_url(url) | |
| self.last_checked = 0 | |
| def mtime(self): | |
| """Returns the time the robots.txt file was last fetched. | |
| This is useful for long-running web spiders that need to | |
| check for new robots.txt files periodically. | |
| """ | |
| return self.last_checked | |
| def modified(self): | |
| """Sets the time the robots.txt file was last fetched to the | |
| current time. | |
| """ | |
| import time | |
| self.last_checked = time.time() | |
| def set_url(self, url): | |
| """Sets the URL referring to a robots.txt file.""" | |
| self.url = url | |
| self.host, self.path = urllib.parse.urlparse(url)[1:3] | |
| def read(self): | |
| """Reads the robots.txt URL and feeds it to the parser.""" | |
| try: | |
| f = urllib.request.urlopen(self.url) | |
| except urllib.error.HTTPError as err: | |
| if err.code in (401, 403): | |
| self.disallow_all = True | |
| elif err.code >= 400 and err.code < 500: | |
| self.allow_all = True | |
| else: | |
| raw = f.read() | |
| self.parse(raw.decode("utf-8").splitlines()) | |
| def _add_entry(self, entry): | |
| if "*" in entry.useragents: | |
| # the default entry is considered last | |
| if self.default_entry is None: | |
| # the first default entry wins | |
| self.default_entry = entry | |
| else: | |
| self.entries.append(entry) | |
| def parse(self, lines): | |
| """Parse the input lines from a robots.txt file. | |
| We allow that a user-agent: line is not preceded by | |
| one or more blank lines. | |
| """ | |
| # states: | |
| # 0: start state | |
| # 1: saw user-agent line | |
| # 2: saw an allow or disallow line | |
| state = 0 | |
| entry = Entry() | |
| self.modified() | |
| for line in lines: | |
| if not line: | |
| if state == 1: | |
| entry = Entry() | |
| state = 0 | |
| elif state == 2: | |
| self._add_entry(entry) | |
| entry = Entry() | |
| state = 0 | |
| # remove optional comment and strip line | |
| i = line.find('#') | |
| if i >= 0: | |
| line = line[:i] | |
| line = line.strip() | |
| if not line: | |
| continue | |
| line = line.split(':', 1) | |
| if len(line) == 2: | |
| line[0] = line[0].strip().lower() | |
| line[1] = urllib.parse.unquote(line[1].strip()) | |
| if line[0] == "user-agent": | |
| if state == 2: | |
| self._add_entry(entry) | |
| entry = Entry() | |
| entry.useragents.append(line[1]) | |
| state = 1 | |
| elif line[0] == "disallow": | |
| if state != 0: | |
| entry.rulelines.append(RuleLine(line[1], False)) | |
| state = 2 | |
| elif line[0] == "allow": | |
| if state != 0: | |
| entry.rulelines.append(RuleLine(line[1], True)) | |
| state = 2 | |
| elif line[0] == "crawl-delay": | |
| if state != 0: | |
| # before trying to convert to int we need to make | |
| # sure that robots.txt has valid syntax otherwise | |
| # it will crash | |
| if line[1].strip().isdigit(): | |
| entry.delay = int(line[1]) | |
| state = 2 | |
| elif line[0] == "request-rate": | |
| if state != 0: | |
| numbers = line[1].split('/') | |
| # check if all values are sane | |
| if (len(numbers) == 2 and numbers[0].strip().isdigit() | |
| and numbers[1].strip().isdigit()): | |
| entry.req_rate = RequestRate(int(numbers[0]), int(numbers[1])) | |
| state = 2 | |
| elif line[0] == "sitemap": | |
| # According to http://www.sitemaps.org/protocol.html | |
| # "This directive is independent of the user-agent line, | |
| # so it doesn't matter where you place it in your file." | |
| # Therefore we do not change the state of the parser. | |
| self.sitemaps.append(line[1]) | |
| if state == 2: | |
| self._add_entry(entry) | |
| def can_fetch(self, useragent, url): | |
| """using the parsed robots.txt decide if useragent can fetch url""" | |
| if self.disallow_all: | |
| return False | |
| if self.allow_all: | |
| return True | |
| # Until the robots.txt file has been read or found not | |
| # to exist, we must assume that no url is allowable. | |
| # This prevents false positives when a user erroneously | |
| # calls can_fetch() before calling read(). | |
| if not self.last_checked: | |
| return False | |
| # search for given user agent matches | |
| # the first match counts | |
| parsed_url = urllib.parse.urlparse(urllib.parse.unquote(url)) | |
| url = urllib.parse.urlunparse(('','',parsed_url.path, | |
| parsed_url.params,parsed_url.query, parsed_url.fragment)) | |
| url = urllib.parse.quote(url) | |
| if not url: | |
| url = "/" | |
| for entry in self.entries: | |
| if entry.applies_to(useragent): | |
| return entry.allowance(url) | |
| # try the default entry last | |
| if self.default_entry: | |
| return self.default_entry.allowance(url) | |
| # agent not found ==> access granted | |
| return True | |
| def crawl_delay(self, useragent): | |
| if not self.mtime(): | |
| return None | |
| for entry in self.entries: | |
| if entry.applies_to(useragent): | |
| return entry.delay | |
| if self.default_entry: | |
| return self.default_entry.delay | |
| return None | |
| def request_rate(self, useragent): | |
| if not self.mtime(): | |
| return None | |
| for entry in self.entries: | |
| if entry.applies_to(useragent): | |
| return entry.req_rate | |
| if self.default_entry: | |
| return self.default_entry.req_rate | |
| return None | |
| def site_maps(self): | |
| if not self.sitemaps: | |
| return None | |
| return self.sitemaps | |
| def __str__(self): | |
| entries = self.entries | |
| if self.default_entry is not None: | |
| entries = entries + [self.default_entry] | |
| return '\n\n'.join(map(str, entries)) | |
| class RuleLine: | |
| """A rule line is a single "Allow:" (allowance==True) or "Disallow:" | |
| (allowance==False) followed by a path.""" | |
| def __init__(self, path, allowance): | |
| if path == '' and not allowance: | |
| # an empty value means allow all | |
| allowance = True | |
| path = urllib.parse.urlunparse(urllib.parse.urlparse(path)) | |
| self.path = urllib.parse.quote(path) | |
| self.allowance = allowance | |
| def applies_to(self, filename): | |
| return self.path == "*" or filename.startswith(self.path) | |
| def __str__(self): | |
| return ("Allow" if self.allowance else "Disallow") + ": " + self.path | |
| class Entry: | |
| """An entry has one or more user-agents and zero or more rulelines""" | |
| def __init__(self): | |
| self.useragents = [] | |
| self.rulelines = [] | |
| self.delay = None | |
| self.req_rate = None | |
| def __str__(self): | |
| ret = [] | |
| for agent in self.useragents: | |
| ret.append(f"User-agent: {agent}") | |
| if self.delay is not None: | |
| ret.append(f"Crawl-delay: {self.delay}") | |
| if self.req_rate is not None: | |
| rate = self.req_rate | |
| ret.append(f"Request-rate: {rate.requests}/{rate.seconds}") | |
| ret.extend(map(str, self.rulelines)) | |
| return '\n'.join(ret) | |
| def applies_to(self, useragent): | |
| """check if this entry applies to the specified agent""" | |
| # split the name token and make it lower case | |
| useragent = useragent.split("/")[0].lower() | |
| for agent in self.useragents: | |
| if agent == '*': | |
| # we have the catch-all agent | |
| return True | |
| agent = agent.lower() | |
| if agent in useragent: | |
| return True | |
| return False | |
| def allowance(self, filename): | |
| """Preconditions: | |
| - our agent applies to this entry | |
| - filename is URL decoded""" | |
| for line in self.rulelines: | |
| if line.applies_to(filename): | |
| return line.allowance | |
| return True | |