This commit is contained in:
sleptworld 2022-03-21 16:25:32 +08:00
parent 7f3f99f24b
commit de17741b9a
12 changed files with 245 additions and 78 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,5 +1,5 @@
import asyncio import asyncio
from spider.parse import ParseConnector, FrontItem from spider.parse import ParseConnector, EndOfItmeLines
import spider.spider as spider import spider.spider as spider
@ -13,7 +13,7 @@ class Application:
self.__parser_loop = asyncio.new_event_loop() self.__parser_loop = asyncio.new_event_loop()
if parseConnector is None: if parseConnector is None:
self.parseConnector = ParseConnector(loop=self.__parser_loop) self.parseConnector = ParseConnector()
else: else:
self.parseConnector = parseConnector self.parseConnector = parseConnector
@ -22,19 +22,30 @@ class Application:
self.tasks = {} self.tasks = {}
for task in tasks: for task in tasks:
assert isinstance(task, spider.SpiderTas) assert isinstance(task, spider.SpiderTask)
self.tasks[task.tag] = task.get_tasks() self.tasks[task.tag] = task.create_tasks()
elif isinstance(tasks, spider.SpiderTask): elif isinstance(tasks, spider.SpiderTask):
self.type = "single_spider" self.type = "single_spider"
self.tasks = {tasks.tag: task.get_tasks()} self.tasks = {tasks.tag: tasks.create_tasks()}
else: else:
raise TypeError raise TypeError
def __creat_tasks(self):
tasks = []
for key in self.tasks.keys():
item = self.tasks[key]
tasks.append(*(item))
return tasks
async def start(self): async def start(self):
self.parseConnector.start_parse() self.parseConnector.start_parse()
for r in asyncio.as_completed(self.tasks): for r in asyncio.as_completed(self.__creat_tasks()):
completed = await r completed = await r
await self.parseConnector.add( self.parseConnector.add(completed)
FrontItem(completed.type, completed.content, completed.cb)
) self.parseConnector.add(EndOfItmeLines())
self.parseConnector.queue.join()
self.parseConnector.process.join()

37
main.py
View File

@ -1,25 +1,32 @@
from app import Application from app.app import Application
from spider.spider import SpiderTask from spider.spider import SpiderTask
from spider.request import Requests from spider.request import Requests
import asyncio import asyncio
def main(): def p(content):
tasks = [
SpiderTask( currentConfirmedCount = content["results"][0]["currentConfirmedCount"]
"news",
Requests( print(currentConfirmedCount)
"http://www.baidu.com",
[],
"get", async def main():
{},
), news_task = await SpiderTask.init(
), tag="news",
] req=Requests("https://lab.isaaclin.cn/nCoV/api/overall", "get"),
headers="",
dataType="json",
parsers=(p,),
)
tasks = [news_task]
app = Application(tasks=tasks) app = Application(tasks=tasks)
asyncio.run(app.start())
await app.start()
if __name__ == "__main__": if __name__ == "__main__":
main() asyncio.run(main())

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,6 +1,6 @@
import threading
import asyncio
from collections.abc import Callable from collections.abc import Callable
from multiprocessing import Process
from multiprocessing import JoinableQueue, Pipe
class OutPutItem: class OutPutItem:
@ -11,6 +11,7 @@ class OutPutItem:
class FrontItem: class FrontItem:
def __init__( def __init__(
self, self,
status_code: int,
contentType: str, contentType: str,
content: str, content: str,
parserFunction: Callable[[str], OutPutItem], parserFunction: Callable[[str], OutPutItem],
@ -18,38 +19,74 @@ class FrontItem:
self.type = contentType self.type = contentType
self.content = content self.content = content
self.parserFunction = parserFunction self.parserFunction = parserFunction
self.status_code = status_code
class EndOfItmeLines(FrontItem):
def __init__(
self,
status_code=None,
contentType=None,
content=None,
parserFunction=None,
):
super().__init__(status_code, contentType, content, parserFunction)
def parsing(queue, conn):
while True:
try:
item: FrontItem = queue.get()
if isinstance(item, EndOfItmeLines):
break
item.parserFunction(item.content)
except:
print("something wrong")
finally:
queue.task_done()
class ParseConnector: class ParseConnector:
def __init__(self, loop, tr=None): def __init__(self, process=None, conn=None, queue=None):
self.__queue = asyncio.Queue() self.process = process
self.__loop = loop self.conn = conn
if tr is None: self.queue = queue
self.__threading = threading.Thread( self.finished = False
target=__NewTr,
args=(loop,),
)
self.__threading.daemon = True
else: def __del__(self):
self.__threading = tr self.conn.close()
self.process.close()
self.__threading.start() def add(self, item: FrontItem):
self.queue.put(item)
async def add(self, item: FrontItem): if isinstance(item, EndOfItmeLines):
await self.__queue.put(item) self.finished = True
async def parsing(self):
while True:
item: FrontItem = await self.__queue.get()
item.parserFunction(item.content)
self.__queue.task_done()
await self.__queue.join()
def start_parse(self): def start_parse(self):
asyncio.run_coroutine_threadsafe(self.parsing(), self.__loop)
if self.conn is None:
parent_conn, child_conn = Pipe()
self.conn = parent_conn
def __NewTr(loop: asyncio.AbstractEventLoop): if self.queue is None:
asyncio.set_event_loop(loop) self.queue = JoinableQueue()
loop.run_forever()
if self.process is None:
self.process = Process(
target=parsing,
args=(
self.queue,
child_conn,
),
)
self.process.daemon = True
self.process.start()
def is_parsing(self):
return self.finished

View File

@ -1,13 +1,20 @@
from typing import List, Dict from typing import List, Dict
from collections.abc import Callable
class Requests: class Requests:
def __init__( def __init__(
self, self,
url: str, url: str,
group: List[str],
method: str, method: str,
group: List[str] = None,
rule: Callable[[str], str] = None,
): ):
self.root = url self.root = url
self.method = method self.method = method
if rule is not None and group is not None:
raise TypeError
self.group = group self.group = group
self.rule = rule

View File

@ -1,8 +1,22 @@
import asyncio import asyncio
import aiohttp import aiohttp
from request import Requests from .request import Requests
from .parse import FrontItem
import urllib import urllib
from typing import List from enum import Enum, auto
from typing import Tuple
from collections.abc import Callable
class SpiderTaskStatus(Enum):
INIT = auto()
PROCESSING = auto()
SUCCESSED = auto()
FAILED = auto()
def test(content):
print(content)
class SpiderTask: class SpiderTask:
@ -10,36 +24,121 @@ class SpiderTask:
self, self,
tag: str, tag: str,
req: Requests, req: Requests,
headers, session: aiohttp.ClientSession,
cookies=None, parsers: Tuple[Callable[[str], str]],
proxy=None, dataType: str,
parser=None,
): ):
self.tag = tag self.tag = tag
self.req = req self.req = req
self.session = aiohttp.ClientSession( self.session = session
headers=headers, self.__tasks = None
cookies=cookies, self.__wrong_list = []
self.__parsers = parsers
self.status = SpiderTaskStatus.INIT
self.finished = 0
self.dataType = dataType
@classmethod
async def init(
self,
tag: str,
req: Requests,
headers,
parsers=(test,),
dataType: str = "text",
cookies=None,
proxy=None,
):
session = aiohttp.ClientSession(headers=headers)
return SpiderTask(
tag,
req,
session,
parsers=parsers,
dataType=dataType,
) )
self.proxy = proxy def create_tasks(self):
async def rq(path: str, parser):
tmp = [] async with self.session.request(self.req.method, path) as response:
if len(req.group) != 0: if response.status != 200:
for r in req.group: await self.task_done(failed=True, path=path)
tmp.append( return FrontItem(
asyncio.create_task( response.status,
aiohttp.request( None,
req.method, None,
url=urllib.parse.urljoin(req.root, r), None,
)
) )
) else:
await self.task_done()
self.__tasks: List[asyncio.Task] = tmp if self.dataType == "text":
if isinstance(parser, list) and len(parser) != len(req.group): c = await response.text()
raise TypeError() elif self.dataType == "json":
self.parser = parser c = await response.json()
else:
c = await response.read()
return FrontItem(
200,
response.content_type,
c,
parser,
)
if self.__tasks is None:
tmp = []
if self.req.group is not None:
if len(self.__parsers) == 1:
for r in self.req.group:
tmp.append(
rq(urllib.parse.urljoin(self.req.root, r)),
self.__parsers[0],
)
else:
for r, parser in zip(self.req.group, self.__parsers):
tmp.append(
rq(urllib.parse.urljoin(self.req.root, r)),
parser,
)
elif self.req.rule is not None:
if len(self.__parsers) == 1:
for r in self.req.rule(self.req.root):
tmp.append(self.__parsers[0])
else:
for r, parser in zip(self.req.group, self.__parsers):
for r in self.req.rule(self.req.root):
tmp.append(parser)
else:
assert len(self.__parsers) == 1
tmp.append(rq(self.req.root, self.__parsers[0]))
self.__tasks = tmp
assert len(self.__parsers) == 1 or len(self.__parsers) == len(self.__tasks)
return self.__tasks
async def close(self):
await self.session.close()
async def task_done(self, failed: bool = False, path: str = None):
if failed:
assert path is not None
self.add_wrong_item(path)
self.finished += 1
if self.finished == len(self.__tasks):
await self.close()
if len(self.__wrong_list) != 0:
self.status = SpiderTaskStatus.FAILED
else:
self.status = SpiderTaskStatus.SUCCESSED
else:
self.status = SpiderTaskStatus.PROCESSING
print(self.status)
def delete_task(self, r: int): def delete_task(self, r: int):
self.__tasks.pop(r) self.__tasks.pop(r)
@ -56,5 +155,11 @@ class SpiderTask:
) )
) )
def add_wrong_item(self, path: str):
self.__wrong_list.append(path)
def get_wrong_item(self):
return self.__wrong_list
def get_tasks(self): def get_tasks(self):
return self.tasks return self.tasks