rs_script/oa.py
2023-09-22 01:54:49 +08:00

269 lines
10 KiB
Python

import requests
import sqlite3
import re
import os
import http.cookiejar
import base64
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from datetime import datetime
import lxml
BASE_URL = "http://222.240.44.2:8899/C6/"
INTERVALS = [
(datetime.strptime('12:00:00', '%H:%M:%S'), datetime.strptime('13:00:00', '%H:%M:%S')),
(datetime.strptime('18:30:00', '%H:%M:%S'), datetime.strptime('19:30:00', '%H:%M:%S'))
]
class OA:
def __init__(self, base_url: str, username: str, pwd: str):
self.username = username
self.pwd = pwd
self.session = requests.session()
self.base_url = base_url
self.work_over = {}
self.leave={}
def login(self):
if os.path.exists("cookies.txt"):
load_cookiejar = http.cookiejar.MozillaCookieJar()
load_cookiejar.load("cookies.txt", ignore_discard=True, ignore_expires=True)
load_cookies = requests.utils.dict_from_cookiejar(load_cookiejar)
cookies = requests.utils.cookiejar_from_dict(load_cookies)
self.session.cookies = cookies
if not self.session.get(urljoin(self.base_url, "JHSoft.Web.WorkFlat/FlatWorksDone.aspx")).ok:
self.__login()
else:
self.__login()
def __login(self):
print("re-login")
self.session.cookies = http.cookiejar.MozillaCookieJar(filename="cookies.txt")
login_path = urljoin(self.base_url, "JHSoft.Web.Login/AjaxForLogin.aspx")
_u = base64.b64encode(self.username.encode('utf-8'))
_p = base64.b64encode(self.pwd.encode('utf-8'))
response = self.session.post(login_path, data={
"type": "login",
"loginCode": _u,
"pwd": _p
}, headers={
"X-Requested-With": "XMLHttpRequest"
})
if response.ok:
print("Login Success: {}".format(self.username))
self.session.cookies.save(ignore_expires=True, ignore_discard=True)
else:
print("Login Failed, error code: {}".format(response.status_code))
print(response.text)
def works(self):
u = urljoin(self.base_url, "JHSoft.Web.WorkFlat/FlatWorksDone.aspx")
res = self.session.get(u)
if res.ok:
self.parse(res)
def parse(self, response):
soup = BeautifulSoup(response.text, features="lxml")
datatable = soup.find(id="DGWorksDone_TBody").find_all("tr")
jbtj = filter(lambda x: "JBTJ" in x.text, datatable)
for jb in jbtj:
self.__single_work_over_parse(jb)
for leave in filter(lambda x: "QJSQ" in x.text, datatable):
self.__single_leave_parse(leave)
def __single_leave_parse(self,leave_node):
info_sql = "<root><No djsn='{}'>c2VsZWN0ICogZnJvbSBDdXN0b21Nb2R1bGVfMjAyMTA4MDIwNjAxIHdoZXJlIE1haW5JRD0nSkhDMDAwMzk4MTgn</No><No1>1</No1><No2>-1</No2><No3>MainID;lsh;sqrq;sqr;yggh;bm;zw;qjlx;qjsy;qjksrq;shi1;ksf;qjjsrq;shi2;jsf;gong;tspd</No3></root>"
current_id = re.search(re.compile(r"QJSQ\d{8}-\d{4}"), leave_node.text).group(0)
cut_path = re.match(re.compile(r"javascript:ClickTitle\('\.\./(.+)'\)"),leave_node.contents[1].a['onclick']).group(1)
djframe_path = urljoin(self.base_url,"JHSoft.Web.CustomQuery/GeneralXmlhttpPage.aspx")
r = self.session.get(djframe_path,params={
"type" : "base64DecodeByService",
"params": cut_path[45:-2]
})
if r.ok:
djsn_id = re.match(re.compile(r"djsn=(.+)&djtype"), r.text).group(1)
res = self.session.post(urljoin(self.base_url,"JHSoft.Web.Module/eformaspx/WebBill.aspx?dataset_fields1"),data=info_sql.format(djsn_id))
if res.ok:
qj_datas = re.findall(re.compile(r"<!\[CDATA(.+?)\]>"),res.text)
if qj_datas[7] == "[调休]":
return
start_date = qj_datas[9][1:-1]
start_hour = qj_datas[10][1:-1]
start_min = qj_datas[11][1:-1]
end_date = qj_datas[12][1:-1]
end_hour = qj_datas[13][1:-1]
end_min = qj_datas[14][1:-1]
starttime = datetime.strptime(("{} {}:{}").format(start_date, start_hour, start_min), "%Y-%m-%d %H:%M")
endtime = datetime.strptime(("{} {}:{}").format(end_date, end_hour, end_min), "%Y-%m-%d %H:%M")
self.leave[current_id] = {
"starttime":starttime,
"endtime":endtime,
"spend": self.__total_time_excluding_intervals(starttime, endtime, INTERVALS)
}
rely = self.session.get(urljoin(self.base_url,"JHSoft.Web.Module/ToolBar/toolbarwf.aspx?{}".format(cut_path[45:]))
,allow_redirects=True
,headers={
"Referer": urljoin(self.base_url,"JHSoft.Web.Module/fceform/common/djframe.htm?{}".format(cut_path[45:])),
"Upgrade-Insecure-Requests" : "1"
})
if rely.ok:
rely_soup = BeautifulSoup(rely.text,features="lxml")
rely_app_id = rely_soup.find(id="hidRelevanceApproveID")['value']
if rely_app_id == "" or rely_app_id is None:
raise Exception("rely error")
relyinfo = self.session.get(urljoin(self.base_url,"JHSoft.Web.WorkFlow/WorkFlow/MyWorkFlowWatchRelevanceAjax.aspx"),params={
"delFlag" : 0,
"strAppID" : rely_app_id
})
if relyinfo.ok:
relyinfo_soup = BeautifulSoup(relyinfo.text,features="lxml")
trs = relyinfo_soup.find_all(id="ReleAppID{}".format(rely_app_id))
used_jb = []
for tr in trs:
rely_work_over_id = re.search(re.compile(r"JBTJ\d{8}-\d{4}"), tr.contents[0].text).group(0)
used_jb.append(rely_work_over_id)
self.leave[current_id]["used_work"] = used_jb
else:
raise Exception("rely error")
else:
raise Exception("rely error")
def __single_work_over_parse(self,work_node):
info_sql = "<root><No djsn='{}'>c2VsZWN0ICogZnJvbSBDdXN0b21Nb2R1bGVfMjAyMTA4MDIwNzAyIGEgaW5uZXIgam9pbiBDdXN0b21Nb2R1bGVfU3ViRGV0YWlsXzIwMjEwODAyMDcwMiBiIG9uIChhLk1haW5JRD1iLk1haW5JRCkgIHdoZXJlIGEuTWFpbklEPSdKSEMwMDAzOTI5NSc=</No><No1>1</No1><No2>-1</No2><No3>MainID;lb;kssj;kssj_shi;kssj_fen;jssj;jssj_shi;jssj_fen;bz</No3></root>"
cut_path = re.match(re.compile(r"javascript:ClickTitle\('\.\./(.+)'\)"),work_node.contents[1].a['onclick']).group(1)
djframe_path = urljoin(self.base_url,"JHSoft.Web.CustomQuery/GeneralXmlhttpPage.aspx")
current_id = re.search(re.compile(r"JBTJ\d{8}-\d{4}"), work_node.text).group(0)
print(current_id)
res = self.session.get(djframe_path,params={
"type" : "base64DecodeByService",
"params": cut_path[45:-2]
})
if res.ok:
djsn_id = re.match(re.compile(r"djsn=(.+)&djtype"), res.text).group(1)
res = self.session.post(urljoin(self.base_url,"JHSoft.Web.Module/eformaspx/WebBill.aspx?dataset_fields1"),data=info_sql.format(djsn_id))
if res.ok:
qj_datas = re.findall(re.compile(r"<!\[CDATA(.+?)\]>"),res.text)
rs = []
for q in range(0, len(qj_datas), 9):
start_date = qj_datas[q + 2]
start_hour = qj_datas[q + 3]
start_min = qj_datas[q + 4]
end_date = qj_datas[q + 5]
end_hour = qj_datas[q + 6]
end_min = qj_datas[q + 7]
starttime = datetime.strptime(("{} {}:{}").format(start_date[1:-1], start_hour[1:-1], start_min[1:-1]), "%Y-%m-%d %H:%M")
endtime = datetime.strptime(("{} {}:{}").format(end_date[1:-1], end_hour[1:-1], end_min[1:-1]), "%Y-%m-%d %H:%M")
spend = self.__total_time_excluding_intervals(starttime, endtime, INTERVALS)
rs.append({
"start": starttime,
"end": endtime,
"spend": spend
})
self.work_over[current_id] = rs
else:
print(res.text)
raise Exception("djframe error")
def __intersection_time(self,start1, end1, start2, end2):
# 返回两个时间段的交集
return max(start1, start2), min(end1, end2)
def __total_time_excluding_intervals(self,start, end, intervals):
total_duration = end - start
for interval in intervals:
t1 = interval[0].replace(year=start.year, month=start.month, day=start.day)
t2 = interval[1].replace(year=start.year, month=start.month, day=start.day)
inter_start, inter_end = self.__intersection_time(start, end, t1,t2)
if inter_start < inter_end: # 有交集
total_duration -= (inter_end - inter_start)
return total_duration
def close(self):
self.session.close()
if __name__ == '__main__':
test = OA(BASE_URL, "120082", "7948799")
test.login()
test.works()
test.close()
leaves = test.leave
works = test.work_over
result_works = {}
for i,work in works.items():
spend_time = sum(work["spend"],datetime.timedelta(0))
result_works[i] = {
"spend" : spend_time,
"used_leave" : [],
"remaining" : spend_time
}
for i,leave in leaves.items():
used = leave['used_work']
spend = leave['spend']
for u in used:
if spend > result_works[u]['remaining']:
result_works[u]['used_leave'].append({
"id" : i,
"spend" : work[u]['remaining']
})
result_works[u]["remaining"] = datetime.timedelta(0)
spend -= result_works[u]['remaining']
else:
result_works[u]['used_leave'].append({
"id" : i,
"spend" : spend
})
result_works[u]["remaining"] -= spend
print(result_works)