diff --git a/__pycache__/main.cpython-310-pytest-7.1.1.pyc b/__pycache__/main.cpython-310-pytest-7.1.1.pyc new file mode 100644 index 0000000..0e45eb0 Binary files /dev/null and b/__pycache__/main.cpython-310-pytest-7.1.1.pyc differ diff --git a/app/__pycache__/__init__.cpython-310.pyc b/app/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..8670a56 Binary files /dev/null and b/app/__pycache__/__init__.cpython-310.pyc differ diff --git a/app/__pycache__/app.cpython-310.pyc b/app/__pycache__/app.cpython-310.pyc new file mode 100644 index 0000000..365ac13 Binary files /dev/null and b/app/__pycache__/app.cpython-310.pyc differ diff --git a/app/app.py b/app/app.py index 8eb50f8..fb8e24d 100644 --- a/app/app.py +++ b/app/app.py @@ -1,5 +1,5 @@ import asyncio -from spider.parse import ParseConnector, FrontItem +from spider.parse import ParseConnector, EndOfItmeLines import spider.spider as spider @@ -13,7 +13,7 @@ class Application: self.__parser_loop = asyncio.new_event_loop() if parseConnector is None: - self.parseConnector = ParseConnector(loop=self.__parser_loop) + self.parseConnector = ParseConnector() else: self.parseConnector = parseConnector @@ -22,19 +22,30 @@ class Application: self.tasks = {} for task in tasks: - assert isinstance(task, spider.SpiderTas) - self.tasks[task.tag] = task.get_tasks() + assert isinstance(task, spider.SpiderTask) + self.tasks[task.tag] = task.create_tasks() elif isinstance(tasks, spider.SpiderTask): self.type = "single_spider" - self.tasks = {tasks.tag: task.get_tasks()} + self.tasks = {tasks.tag: tasks.create_tasks()} else: raise TypeError + def __creat_tasks(self): + tasks = [] + for key in self.tasks.keys(): + item = self.tasks[key] + + tasks.append(*(item)) + + return tasks + async def start(self): self.parseConnector.start_parse() - for r in asyncio.as_completed(self.tasks): + for r in asyncio.as_completed(self.__creat_tasks()): completed = await r - await self.parseConnector.add( - FrontItem(completed.type, completed.content, completed.cb) - ) + self.parseConnector.add(completed) + + self.parseConnector.add(EndOfItmeLines()) + self.parseConnector.queue.join() + self.parseConnector.process.join() diff --git a/main.py b/main.py index 82ac057..186caca 100644 --- a/main.py +++ b/main.py @@ -1,25 +1,32 @@ -from app import Application +from app.app import Application from spider.spider import SpiderTask from spider.request import Requests import asyncio -def main(): - tasks = [ - SpiderTask( - "news", - Requests( - "http://www.baidu.com", - [], - "get", - {}, - ), - ), - ] +def p(content): + + currentConfirmedCount = content["results"][0]["currentConfirmedCount"] + + print(currentConfirmedCount) + + +async def main(): + + news_task = await SpiderTask.init( + tag="news", + req=Requests("https://lab.isaaclin.cn/nCoV/api/overall", "get"), + headers="", + dataType="json", + parsers=(p,), + ) + + tasks = [news_task] app = Application(tasks=tasks) - asyncio.run(app.start()) + + await app.start() if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/spider/__pycache__/__init__.cpython-310.pyc b/spider/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..438f259 Binary files /dev/null and b/spider/__pycache__/__init__.cpython-310.pyc differ diff --git a/spider/__pycache__/parse.cpython-310.pyc b/spider/__pycache__/parse.cpython-310.pyc new file mode 100644 index 0000000..387493a Binary files /dev/null and b/spider/__pycache__/parse.cpython-310.pyc differ diff --git a/spider/__pycache__/request.cpython-310.pyc b/spider/__pycache__/request.cpython-310.pyc new file mode 100644 index 0000000..d31646d Binary files /dev/null and b/spider/__pycache__/request.cpython-310.pyc differ diff --git a/spider/__pycache__/spider.cpython-310.pyc b/spider/__pycache__/spider.cpython-310.pyc new file mode 100644 index 0000000..0283d2f Binary files /dev/null and b/spider/__pycache__/spider.cpython-310.pyc differ diff --git a/spider/parse.py b/spider/parse.py index df3b515..93562e9 100644 --- a/spider/parse.py +++ b/spider/parse.py @@ -1,6 +1,6 @@ -import threading -import asyncio from collections.abc import Callable +from multiprocessing import Process +from multiprocessing import JoinableQueue, Pipe class OutPutItem: @@ -11,6 +11,7 @@ class OutPutItem: class FrontItem: def __init__( self, + status_code: int, contentType: str, content: str, parserFunction: Callable[[str], OutPutItem], @@ -18,38 +19,74 @@ class FrontItem: self.type = contentType self.content = content self.parserFunction = parserFunction + self.status_code = status_code + + +class EndOfItmeLines(FrontItem): + def __init__( + self, + status_code=None, + contentType=None, + content=None, + parserFunction=None, + ): + super().__init__(status_code, contentType, content, parserFunction) + + +def parsing(queue, conn): + while True: + + try: + item: FrontItem = queue.get() + + if isinstance(item, EndOfItmeLines): + break + + item.parserFunction(item.content) + + except: + print("something wrong") + + finally: + queue.task_done() class ParseConnector: - def __init__(self, loop, tr=None): - self.__queue = asyncio.Queue() - self.__loop = loop - if tr is None: - self.__threading = threading.Thread( - target=__NewTr, - args=(loop,), - ) - self.__threading.daemon = True + def __init__(self, process=None, conn=None, queue=None): + self.process = process + self.conn = conn + self.queue = queue + self.finished = False - else: - self.__threading = tr + def __del__(self): + self.conn.close() + self.process.close() - self.__threading.start() - - async def add(self, item: FrontItem): - await self.__queue.put(item) - - async def parsing(self): - while True: - item: FrontItem = await self.__queue.get() - item.parserFunction(item.content) - self.__queue.task_done() - await self.__queue.join() + def add(self, item: FrontItem): + self.queue.put(item) + if isinstance(item, EndOfItmeLines): + self.finished = True def start_parse(self): - asyncio.run_coroutine_threadsafe(self.parsing(), self.__loop) + if self.conn is None: + parent_conn, child_conn = Pipe() + self.conn = parent_conn -def __NewTr(loop: asyncio.AbstractEventLoop): - asyncio.set_event_loop(loop) - loop.run_forever() + if self.queue is None: + self.queue = JoinableQueue() + + if self.process is None: + self.process = Process( + target=parsing, + args=( + self.queue, + child_conn, + ), + ) + self.process.daemon = True + + self.process.start() + + def is_parsing(self): + return self.finished diff --git a/spider/request.py b/spider/request.py index 5ba53a3..e605578 100644 --- a/spider/request.py +++ b/spider/request.py @@ -1,13 +1,20 @@ from typing import List, Dict +from collections.abc import Callable class Requests: def __init__( self, url: str, - group: List[str], method: str, + group: List[str] = None, + rule: Callable[[str], str] = None, ): self.root = url self.method = method + + if rule is not None and group is not None: + raise TypeError + self.group = group + self.rule = rule diff --git a/spider/spider.py b/spider/spider.py index 6330f19..584aa1d 100644 --- a/spider/spider.py +++ b/spider/spider.py @@ -1,8 +1,22 @@ import asyncio import aiohttp -from request import Requests +from .request import Requests +from .parse import FrontItem import urllib -from typing import List +from enum import Enum, auto +from typing import Tuple +from collections.abc import Callable + + +class SpiderTaskStatus(Enum): + INIT = auto() + PROCESSING = auto() + SUCCESSED = auto() + FAILED = auto() + + +def test(content): + print(content) class SpiderTask: @@ -10,36 +24,121 @@ class SpiderTask: self, tag: str, req: Requests, - headers, - cookies=None, - proxy=None, - parser=None, + session: aiohttp.ClientSession, + parsers: Tuple[Callable[[str], str]], + dataType: str, ): self.tag = tag self.req = req - self.session = aiohttp.ClientSession( - headers=headers, - cookies=cookies, + self.session = session + self.__tasks = None + self.__wrong_list = [] + self.__parsers = parsers + self.status = SpiderTaskStatus.INIT + self.finished = 0 + self.dataType = dataType + + @classmethod + async def init( + self, + tag: str, + req: Requests, + headers, + parsers=(test,), + dataType: str = "text", + cookies=None, + proxy=None, + ): + session = aiohttp.ClientSession(headers=headers) + return SpiderTask( + tag, + req, + session, + parsers=parsers, + dataType=dataType, ) - self.proxy = proxy - - tmp = [] - if len(req.group) != 0: - for r in req.group: - tmp.append( - asyncio.create_task( - aiohttp.request( - req.method, - url=urllib.parse.urljoin(req.root, r), - ) + def create_tasks(self): + async def rq(path: str, parser): + async with self.session.request(self.req.method, path) as response: + if response.status != 200: + await self.task_done(failed=True, path=path) + return FrontItem( + response.status, + None, + None, + None, ) - ) + else: + await self.task_done() - self.__tasks: List[asyncio.Task] = tmp - if isinstance(parser, list) and len(parser) != len(req.group): - raise TypeError() - self.parser = parser + if self.dataType == "text": + c = await response.text() + elif self.dataType == "json": + c = await response.json() + else: + c = await response.read() + + return FrontItem( + 200, + response.content_type, + c, + parser, + ) + + if self.__tasks is None: + tmp = [] + + if self.req.group is not None: + if len(self.__parsers) == 1: + for r in self.req.group: + tmp.append( + rq(urllib.parse.urljoin(self.req.root, r)), + self.__parsers[0], + ) + else: + for r, parser in zip(self.req.group, self.__parsers): + tmp.append( + rq(urllib.parse.urljoin(self.req.root, r)), + parser, + ) + elif self.req.rule is not None: + if len(self.__parsers) == 1: + for r in self.req.rule(self.req.root): + tmp.append(self.__parsers[0]) + else: + for r, parser in zip(self.req.group, self.__parsers): + for r in self.req.rule(self.req.root): + tmp.append(parser) + + else: + assert len(self.__parsers) == 1 + tmp.append(rq(self.req.root, self.__parsers[0])) + + self.__tasks = tmp + assert len(self.__parsers) == 1 or len(self.__parsers) == len(self.__tasks) + return self.__tasks + + async def close(self): + await self.session.close() + + async def task_done(self, failed: bool = False, path: str = None): + if failed: + assert path is not None + self.add_wrong_item(path) + self.finished += 1 + + if self.finished == len(self.__tasks): + await self.close() + + if len(self.__wrong_list) != 0: + self.status = SpiderTaskStatus.FAILED + else: + self.status = SpiderTaskStatus.SUCCESSED + else: + self.status = SpiderTaskStatus.PROCESSING + + print(self.status) def delete_task(self, r: int): self.__tasks.pop(r) @@ -56,5 +155,11 @@ class SpiderTask: ) ) + def add_wrong_item(self, path: str): + self.__wrong_list.append(path) + + def get_wrong_item(self): + return self.__wrong_list + def get_tasks(self): return self.tasks