国家建设厅官方网站,wordpress4.9.1下载,科技有限公司经营范围,华为商城官网手机版app前段时间有位客户问: 你们的程序能不能给我们生成个 txt 文件,把新增的员工都放进来,字段也不需要太多,就要 员工姓名/卡号/员工编号/员工职位/公司 这些字段就行了,然后我们的程序会去读取这个 txt 文件,拿里面的内容,读完之后会这个文件删掉 我: 可以接受延迟吗?可能没办法实…前段时间有位客户问: 你们的程序能不能给我们生成个 txt 文件,把新增的员工都放进来,字段也不需要太多,就要 员工姓名/卡号/员工编号/员工职位/公司 这些字段就行了,然后我们的程序会去读取这个 txt 文件,拿里面的内容,读完之后会这个文件删掉 我: 可以接受延迟吗?可能没办法实时生成,写个脚本,用定时任务去跑还是可以实现的 客户: 可以呀,那个脚本 30mins 跑一次就行 这篇文章就记录一下大概实现思路,后续如果遇到类似需求,就可以直接 copy 代码了
实现思路:
定义全局变量 update_time判断 update_time 是否为空 为空,说明是第一次查询数据,取查到的最后一条记录的 create_time ,赋值给 update_time不为空,说明不是第一次查询数据,查询数据时, create_time update_time ,同时更新 update_time 判断 txt 文件是否存在 存在,则 txt 表头不需要重新生成不存在, txt 表头需要重新生成
其实逻辑很简单, Python 类库也很丰富,接下来就上代码:
import pymysql
import os
import argparse
import schedule
import datetimeupdate_time def handle_variables(server_ip,password):global real_ip real_ip server_ipglobal root_password root_password passworddef get_person_info():connection pymysql.connect(host real_ip,user root,password root_password,db xxx)try:global update_timecur connection.cursor()if update_time :sql select name,card_num,code,postion,company_id,gmt_create from personelse :sql select name,card_num,code,postion,company_id,gmt_create from person where gmt_create update_time cur.execute(sql)query_person_info_list cur.fetchall()# get the time of the last recordif len(query_person_info_list) ! 0:temp_list query_person_info_list[-1]update_time temp_list[-1]if isinstance(update_time, datetime.datetime):update_time update_time.strftime(%Y-%m-%d %H:%M:%S)# if the txt not exists, add new textif not os.path.exists(add_person.txt) :with open(add_person.txt, w) as f:f.write(Lastname, Firstname, CardNumber, Employee ID, Designation, Department \n)for person_info in query_person_info_list :person_name person_info[0]# the name is generally first last, which needs to be convertedperson_name_list person_name.split()person_first_name person_name_list[0]start_length len(person_first_name) 1person_last_name person_name[start_length:]f.write(str(person_last_name) , str(person_first_name) , )card_number person_info[1]employee_id person_info[2]designation person_info[3]department person_info[4]if card_number is not None :f.write(str(card_number) , )else :f.write(, )if employee_id is not None :f.write(str(employee_id) , )else :f.write(, )if designation is not None :f.write(str(designation) , )else :f.write(, )if department is not None :# get department namedepartment_sql select name from zone where id str(department)cur.execute(department_sql)department_result cur.fetchone()department_name department_result[0]f.write(str(department_name))else :f.write(, )f.write(\n)# if the txt exists, update the textelse :# get original datawith open(add_person.txt, r) as e:data e.readlines()with open(add_person.txt, w) as f:# first save the original data for write_data in data :f.write(write_data)# then save new datafor person_info in query_person_info_list :person_name person_info[0]# the name is generally first last, which needs to be convertedperson_name_list person_name.split()person_first_name person_name_list[0]start_length len(person_first_name) 1person_last_name person_name[start_length:]f.write(str(person_last_name) , str(person_first_name) , )card_number person_info[1]employee_id person_info[2]designation person_info[3]department person_info[4]if card_number is not None :f.write(str(card_number) , )else :f.write(, )if employee_id is not None :f.write(str(employee_id) , )else :f.write(, )if designation is not None :f.write(str(designation) , )else :f.write(, )if department is not None :# get department namedepartment_sql select name from zone where id str(department)cur.execute(department_sql)department_result cur.fetchone()department_name department_result[0]f.write(str(department_name))else :f.write(, )f.write(\n)except IOError:print(Error: get update person info fail)finally:cur.close()connection.close()if __name__ __main__:parser argparse.ArgumentParser(descriptionplease enter)parser.add_argument(--server_ip, -server, helpserver ip, default, typestr, requiredFalse)parser.add_argument(--password, -p, helpdevops infra password, default, typestr, requiredFalse)args parser.parse_args()# let the variable become a global variable, no need to pass ithandle_variables(args.server_ip, args.password)# execute the method every 30mins#schedule.every(30).minutes.do(get_person_info)# for test -- execute the method every 30 secondsschedule.every(30).seconds.do(get_person_info)while True:# run task schedule.run_pending() 以上~