同行1800多公里,跟着货车司机跑长途(人民眼·货车司机
图①:山西临汾经济技术开发区兴荣供应链有限公司的货车整装待发。资料图片 图②:司机王勇平驾驶货车行驶在
(资料图片仅供参考)
最近收集了一批数据,各地根据问题数据做出反馈,但是各地在反馈的时候字段都进行了创新,好在下发的数据内容并没有改变,开始写的单进程的,由于时间较长,耗时380+秒,又改成多进程的,时间缩短为80-秒。现在把程序发出来,请各位大神进行指正。
import multiprocessingimport osimport timeimport pandas as pdfrom sqlalchemy import create_engineimport asyncioimport warnings# warnings.simplefilter("ignore")ywstrList=["经办机构", "原子业务编号", "原子业务名称", "风险名称","风险描述", "校验规则结果", "创建时间", "风险提示信息", "业务日志号"]ywstrListMemo=["经办机构", "原子业务编号", "原子业务名称", "风险名称","风险描述", "校验规则结果", "创建时间", "风险提示信息", "业务日志号","memo","time"]szlist = ["省本", "成都", "自贡", "攀枝", "泸州", "德阳", "绵阳", "广元", "遂宁", "内江", "乐山", "南充", "眉山", "宜宾", "广安", "达州", "雅安", "巴中", "资阳", "阿坝", "甘孜", "凉山"]# connect = create_engine("mysql+pymysql://root:@127.0.0.1:3306/ywgk?charset=utf8")connect = create_engine("mysql+mysqlconnector://root:@127.0.0.1:3306/ywgk?charset=utf8")# engine = create_engine("mysql+mysqlconnector://scott:tiger@localhost/foo")def findSZ(filename): for sz in szlist: if filename.find(sz) != -1: return sz return Nonedef ReadExcel(filename): xlsdf = "" xlsdf = pd.read_excel(filename) """" remove columns="sz" or "市" """ if "市" in list(xlsdf.keys().to_list()): xlsdf.drop(columns="市", axis=1, inplace=True) if "sz" in list(xlsdf.keys().to_list()): xlsdf.drop(columns="sz", axis=1, inplace=True) xlsdf = xlsdf.fillna("").astype("string") return xlsdfdef filterDataOfSz(filename, xlsdf): sz = findSZ(filename) print(sz) """ 筛选出包含对应市州的数据。 """ if sz != None: xlsdf = xlsdf[xlsdf["经办机构"].str[:2] == sz] # 筛出本市州数据 return xlsdfdef ConCatRestCols(xlsdf): """ 去掉业务部分字段,保留市州反馈意见。 """ print(xlsdf) # if xlsdf==None: # print(filename+"为空,需要处理") # return xlfdf_keys_set = set(xlsdf.keys().to_list()) xlsdf_restkeys_set = xlfdf_keys_set - set(ywstrList) xlsdf_restkeys_list = list(xlsdf_restkeys_set) xls_rest_df = xlsdf.loc[:, xlsdf_restkeys_list] # 可以正确操作 xlsdf["memo"] = "#" for col in xlsdf_restkeys_list: xlsdf["memo"] += xlsdf[col] # return xlsdfdef SetTimeStamp(filename, xlsdf): xlsdf["time"] = os.stat(str(filename)).st_mtime return xlsdfasync def ProcessExcelAndtosql(filename, table): df = ReadExcel(filename) df = filterDataOfSz(filename=filename, xlsdf=df) df = ConCatRestCols(df) df = SetTimeStamp(filename=filename, xlsdf=df) df = df.loc[:, ywstrListMemo] print(filename) print(df) df.to_sql(name=table, con=connect, if_exists="append", index=False, chunksize=1000, method="multi")def profile(func): def wrapers(*args,**kwargs): print("测试开始") begin=time.time() func(*args,**kwargs) end=time.time() print(f"耗时{end-begin}秒") return wrapers# async def getmsg(msg):# print(f"#{msg}")# await asyncio.sleep(1)def getFiles(src:str): import pathlib files=[] for file in pathlib.Path(src).rglob("*.xls?"): files.append(str(file)) return filesdef process_asyncio(files,table): loop=asyncio.new_event_loop() tasks=[loop.create_task(ProcessExcelAndtosql(filename,table)) for filename in files] loop.run_until_complete(asyncio.wait(tasks))@profiledef run(iterable,table): process_count = multiprocessing.cpu_count() # print(process_count) pool = multiprocessing.Pool(process_count-2) iterable=get_chunks(iterable, process_count) for lst in iterable: pool.apply_async(process_asyncio, args=(lst,table)) pool.close() pool.join()def main(): files = getFiles(r"e:\市州返回") run(files, "ywgk3")def get_chunks(iterable,num): # global iterable import numpy as np return np.array_split(iterable, num)# import profileif __name__=="__main__": main()
本人只是编程的业余爱好者,只是把技术用于辅助工作,并没有深入研究技术理论,都是野路子,还请批评指正。
标签:
图①:山西临汾经济技术开发区兴荣供应链有限公司的货车整装待发。资料图片 图②:司机王勇平驾驶货车行驶在
2022年北京冬奥会的筹办过程,为中国冰雪运动发展提供了巨大动力。科技创新,成为中国冰雪运动前进道路上嘹亮的号角。在科学技术部社会发展
游客在银川市黄河横城旅游度假区观看花灯展(2月5日摄)。春节假期,“2022黄河横城冰雪彩灯艺术节”在宁夏银川市
新华社香港2月6日电题:狮子山下的舞狮人新华社记者韦骅“左眼精,右眼灵,红光万象,富贵繁荣!”“口食八方财,
正在进行围封或强制检测的葵涌邨居民在登记(资料照片)。新华社发新华社香港2月6日电 题:凝聚香港社会共克时艰
2月6日,航拍青海省西宁市雪后美景。受较强冷空气影响,2月5日至6日,青海迎来大范围降雪天气过程,古城西宁银装
[ 相关新闻 ]