网站首页 资讯 热点 行情 地区 推荐 民宿 酒店 家居 度假 滚动
首页 >  热点 >  >  正文

一个读取excel数据处理完成后读入数据库的例子

2023-05-23 09:09:29来源:爱好不执念


(资料图片仅供参考)

最近收集了一批数据,各地根据问题数据做出反馈,但是各地在反馈的时候字段都进行了创新,好在下发的数据内容并没有改变,开始写的单进程的,由于时间较长,耗时380+秒,又改成多进程的,时间缩短为80-秒。现在把程序发出来,请各位大神进行指正。

import multiprocessingimport osimport timeimport pandas as pdfrom sqlalchemy import create_engineimport asyncioimport  warnings# warnings.simplefilter("ignore")ywstrList=["经办机构", "原子业务编号", "原子业务名称", "风险名称","风险描述",           "校验规则结果", "创建时间", "风险提示信息", "业务日志号"]ywstrListMemo=["经办机构", "原子业务编号", "原子业务名称", "风险名称","风险描述",           "校验规则结果", "创建时间", "风险提示信息", "业务日志号","memo","time"]szlist = ["省本", "成都", "自贡", "攀枝", "泸州", "德阳", "绵阳", "广元",              "遂宁", "内江", "乐山", "南充", "眉山", "宜宾", "广安", "达州",              "雅安", "巴中", "资阳", "阿坝", "甘孜", "凉山"]# connect = create_engine("mysql+pymysql://root:@127.0.0.1:3306/ywgk?charset=utf8")connect = create_engine("mysql+mysqlconnector://root:@127.0.0.1:3306/ywgk?charset=utf8")# engine = create_engine("mysql+mysqlconnector://scott:tiger@localhost/foo")def findSZ(filename):    for sz in szlist:        if filename.find(sz) != -1:            return sz    return Nonedef ReadExcel(filename):    xlsdf = ""    xlsdf = pd.read_excel(filename)    """"    remove columns="sz" or "市"    """    if "市" in list(xlsdf.keys().to_list()):        xlsdf.drop(columns="市", axis=1, inplace=True)    if "sz" in list(xlsdf.keys().to_list()):        xlsdf.drop(columns="sz", axis=1, inplace=True)    xlsdf = xlsdf.fillna("").astype("string")    return xlsdfdef filterDataOfSz(filename, xlsdf):    sz = findSZ(filename)    print(sz)    """    筛选出包含对应市州的数据。    """    if sz != None:        xlsdf = xlsdf[xlsdf["经办机构"].str[:2] == sz]  # 筛出本市州数据        return xlsdfdef ConCatRestCols(xlsdf):    """    去掉业务部分字段,保留市州反馈意见。    """    print(xlsdf)    # if xlsdf==None:    #     print(filename+"为空,需要处理")    #     return    xlfdf_keys_set = set(xlsdf.keys().to_list())    xlsdf_restkeys_set = xlfdf_keys_set - set(ywstrList)    xlsdf_restkeys_list = list(xlsdf_restkeys_set)    xls_rest_df = xlsdf.loc[:, xlsdf_restkeys_list]  # 可以正确操作    xlsdf["memo"] = "#"    for col in xlsdf_restkeys_list:        xlsdf["memo"] += xlsdf[col]    #    return xlsdfdef SetTimeStamp(filename, xlsdf):    xlsdf["time"] = os.stat(str(filename)).st_mtime    return xlsdfasync def ProcessExcelAndtosql(filename, table):    df = ReadExcel(filename)    df = filterDataOfSz(filename=filename, xlsdf=df)    df = ConCatRestCols(df)    df = SetTimeStamp(filename=filename, xlsdf=df)    df = df.loc[:, ywstrListMemo]    print(filename)    print(df)    df.to_sql(name=table, con=connect, if_exists="append", index=False, chunksize=1000, method="multi")def profile(func):    def wrapers(*args,**kwargs):        print("测试开始")        begin=time.time()        func(*args,**kwargs)        end=time.time()        print(f"耗时{end-begin}秒")    return wrapers# async def getmsg(msg):#     print(f"#{msg}")#     await asyncio.sleep(1)def getFiles(src:str):    import pathlib    files=[]    for file in  pathlib.Path(src).rglob("*.xls?"):        files.append(str(file))    return filesdef process_asyncio(files,table):    loop=asyncio.new_event_loop()    tasks=[loop.create_task(ProcessExcelAndtosql(filename,table)) for filename in files]    loop.run_until_complete(asyncio.wait(tasks))@profiledef run(iterable,table):    process_count = multiprocessing.cpu_count()    # print(process_count)    pool = multiprocessing.Pool(process_count-2)    iterable=get_chunks(iterable, process_count)    for lst in iterable:        pool.apply_async(process_asyncio, args=(lst,table))    pool.close()    pool.join()def main():    files = getFiles(r"e:\市州返回")    run(files, "ywgk3")def get_chunks(iterable,num):    # global iterable    import numpy as np    return np.array_split(iterable, num)# import  profileif __name__=="__main__":        main()                   

本人只是编程的业余爱好者,只是把技术用于辅助工作,并没有深入研究技术理论,都是野路子,还请批评指正。

标签:

相关文章

[ 相关新闻 ]