init.py#

首先,module其实就是一个.py文件,中文名为模块,其内置有各种函数和类与变量等。而package就是一个包含很多subpackage或者module(.py文件)的一个包。

img

一个directories 只有包含__init__.py文件才会被python识别成package。只有在import package时,才会执行package目录下的__init__.py文件。

若文件结构如下:

1
2
3
4
5
6
7
8
9
10
mypackage
——__init__.py
——subpackage_1
——__init__.py
——test11.py
——test12.py
——subpackage_2
——__init__.py
——test21.py
——test22.py

pickle, json#

都是四个函数:

1
2
3
4
5
6
7
8
9
10
pickle.dumps():将 Python 中的对象序列化成二进制对象,并返回;
pickle.loads():读取给定的二进制对象数据,并将其转换为 Python 对象;
pickle.dump():将 Python 中的对象序列化成二进制对象,并写入文件;
pickle.load():读取指定的序列化数据文件,并返回对象。


json.load()从文件中读取json字符串
json.loads()将json字符串转换为字典类型
json.dumps()将python中的字典类型转换为字符串类型
json.dump()将json格式字符串写到文件中

参考资料#

init.py文件与__all__变量

python模块中__init__.py的作用

Python:init.py文件和、all__、import、__name__、__doc

按天生成日志#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# #coding=utf-8
import logging,os # 引入logging模块
from com_tools import setting


from logging import handlers

class Logger(object):
level_relations = {
'debug':logging.DEBUG,
'info':logging.INFO,
'warning':logging.WARNING,
'error':logging.ERROR,
'crit':logging.CRITICAL
}#日志级别关系映射


def __init__(self,filename,level='info',when='MIDNIGHT',backCount=7,fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
self.logger = logging.getLogger(filename)
format_str = logging.Formatter(fmt)#设置日志格式
self.logger.setLevel(self.level_relations.get(level))#设置日志级别
sh = logging.StreamHandler()#往控制台输出
sh.setFormatter(format_str) #设置控制台上显示的格式
th = handlers.TimedRotatingFileHandler(filename=filename,interval=1,when=when,backupCount=backCount,encoding='utf-8')#往文件里写入#指定间隔时间自动生成文件的处理器
#实例化TimedRotatingFileHandler
#interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
# S 秒
# M 分
# H 小时、
# D 天、
# W 每星期(interval==0时代表星期一)
# midnight 每天凌晨
th.suffix = "%Y-%m-%d.log" #设置文件后缀
th.setFormatter(format_str)#设置文件里写入的格式
self.logger.addHandler(sh) #把对象加到logger里
self.logger.addHandler(th)
logfile = os.path.join(setting.logs_path, "daodianmockapi.txt") # 这个文件的名称就是当天的日志文件,过往的日志文件,会在后面追加文件后缀 th.suffix
logger = Logger(logfile,level='debug')

if __name__ == '__main__':
#logger = Logger('all.log',level='debug')
# filename = setting.now_time+ ".txt"
# logfile = os.path.join(setting.logs_path,filename)
# logger = Logger(logfile,level='debug')
logger.logger.debug('debug') # 括号内的内容即为日志的文本内容
logger.logger.info('info')
logger.logger.warning('警告')
logger.logger.error('报错')
logger.logger.critical('严重')
#Logger('error.log', level='error').logger.error('error')

loguru#

代码#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import logging
import os
from logging.handlers import TimedRotatingFileHandler


class TRLogger:
def __init__(self, logname, loglevel, logger):
# 创建一个logger
self.logger = logging.getLogger(logger)
self.logger.setLevel(logging.DEBUG)

# 创建一个handler,用于写入日志文件
if os.path.exists(os.path.join(os.getcwd(), logname)):
os.system("rm -rf {}".format(os.path.join(os.getcwd(), logname)))

fh = TimedRotatingFileHandler(logname, when='D', interval=1)
fh.setLevel(logging.DEBUG)

# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# 定义handler的输出格式
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

formatter = logging.Formatter('[%(asctime)s] - [logger name :%(name)s] - [%(filename)s file line:%(lineno)d] '
'- %(levelname)s: %(message)s')

fh.setFormatter(formatter)
ch.setFormatter(formatter)

# 给logger添加handler
self.logger.addHandler(fh)
self.logger.addHandler(ch)

def getlog(self):
return self.logger



class Logger_:
def __init__(self, logname, loglevel, logger):
# 创建一个logger
self.logger = logging.getLogger(logger)
self.logger.setLevel(logging.DEBUG)

# 创建一个handler,用于写入日志文件
if os.path.exists(os.path.join(os.getcwd(), logname)):
os.system("rm -rf {}".format(os.path.join(os.getcwd(), logname)))


fh = logging.FileHandler(logname)
fh.setLevel(logging.DEBUG)

# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# 定义handler的输出格式
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

formatter = logging.Formatter('[%(asctime)s] - [logger name :%(name)s] - [%(filename)s file line:%(lineno)d] '
'- %(levelname)s: %(message)s')

fh.setFormatter(formatter)
ch.setFormatter(formatter)

# 给logger添加handler
self.logger.addHandler(fh)
self.logger.addHandler(ch)

def getlog(self):
return self.logger

参考资料#

python定时任务最强框架APScheduler详细教程

Loguru — 最强大的 Python 日志记录器

python 调试方法

多GPU分配#

实现逻辑:寻找(可用显存 / 总显存)最大的的GPU,并优先安排任务

nvidia-smi可以很方便的获得GPU的各种详细信息。

首先获得可用的GPU数目,nvidia-smi -L | grep GPU |wc -l

然后获得GPU各自的总显存,nvidia-smi -q -d Memory | grep -A4 GPU | grep Total | grep -o ‘[0-9]+‘

最后获得GPU各自的可用显存,nvidia-smi -q -d Memory | grep -A4 GPU | grep Free | grep -o ‘[0-9]+‘

将(可用显存 / 总显存)另存为numpy数组,并使用np.argmax返回值即为可用GPU

1
2
3
4
5
6
7
8
9
10
11
12
def available_GPU(self):
import subprocess
import numpy as np
nDevice = int(subprocess.getoutput("nvidia-smi -L | grep GPU |wc -l"))
total_GPU_str = subprocess.getoutput("nvidia-smi -q -d Memory | grep -A4 GPU | grep Total | grep -o '[0-9]\+'")
total_GPU = total_GPU_str.split('\n')
total_GPU = np.array([int(device_i) for device_i in total_GPU])
avail_GPU_str = subprocess.getoutput("nvidia-smi -q -d Memory | grep -A4 GPU | grep Free | grep -o '[0-9]\+'")
avail_GPU = avail_GPU_str.split('\n')
avail_GPU = np.array([int(device_i) for device_i in avail_GPU])
avail_GPU = avail_GPU / total_GPU
return np.argmax(avail_GPU)

参考资料#

pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel

torch 多进程训练(详细例程)

PyTorch多进程分布式训练最简单最好用的实施办法

python并行编程 - GPU篇

Pytorch 分布式、多进程模块测试

多进程GPU调用问题

GPU加速02:超详细Python Cuda零基础入门教程,没有显卡也能学

使用python多GPU任务分配

Deep Learning:PyTorch 基于docker 容器的分布式训练实践

基本使用

  1. 迭代的形式

    使用tqdm()封装可迭代的对象:

    1
    2
    3
    4
    5
    6
    7
    from tqdm import tqdm
    from time import sleep

    text = ""
    for char in tqdm(["a", "b", "c", "d"]):
    sleep(0.25)
    text = text + char
    阅读全文 »

asyncio + yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#-*- coding:utf8 -*-
import asyncio

@asyncio.coroutine
def test(i):
print('test_1', i)
r = yield from asyncio.sleep(1)
print('test_2', i)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [test(i) for i in range(3)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine把一个generator标记为coroutine类型,然后就把这个coroutine扔到EventLoop中执行。test()会首先打印出test_1,然后yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。

阅读全文 »

偏函数

定义:偏函数的第二个部分(可变参数),按原有函数的参数顺序进行补充,参数将作用在原函数上,最后偏函数返回一个新函数(类似于,装饰器decorator,对于函数进行二次包装,产生特殊效果;但又不同于装饰器,偏函数产生了一个新函数,而装饰器,可改变被装饰函数的函数入口地址也可以不影响原函数)

例一:

1
2
3
4
5
6
7
8
9
from functools import partial

def mod( n, m ):
return n % m

mod_by_100 = partial( mod, 100 )

print mod( 100, 7 ) # 2
print mod_by_100( 7 ) # 2

例二:

1
2
3
4
5
6
7
8
9
from functools import partial

bin2dec = partial( int, base=2 )
print bin2dec( '0b10001' ) # 17
print bin2dec( '10001' ) # 17

hex2dec = partial( int, base=16 )
print hex2dec( '0x67' ) # 103
print hex2dec( '67' ) # 103
阅读全文 »

python 多线程实现

首先,python的多线程是假的。

看一个例子来看看python多线程的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import time

def say(name):
print('你好%s at %s' %(name,time.ctime()))
time.sleep(2)
print("结束%s at %s" %(name,time.ctime()))

def listen(name):
print('你好%s at %s' % (name,time.ctime()))
time.sleep(4)
print("结束%s at %s" % (name,time.ctime()))

if __name__ == '__main__':
t1 = threading.Thread(target=say,args=('tony',)) #创建线程对象,Thread是一个类,实例化产生t1对象,这里就是创建了一个线程对象t1
t1.start() #启动线程,线程执行
t2 = threading.Thread(target=listen, args=('simon',)) #这里就是创建了一个线程对象t2
t2.start()

print("程序结束=====================")
# 输出结果为
你好tony at Fri Aug 20 15:42:07 2021 --t1线程执行
你好simon at Fri Aug 20 15:42:07 2021 --t2线程执行
程序结束===================== --主线程执行
结束tony at Fri Aug 20 15:42:09 2021 --sleep之后,t1线程执行
结束simon at Fri Aug 20 15:42:11 2021 --sleep之后,t2线程执行

我们可以看到主线程的print并不是等t1,t2线程都执行完毕之后才打印的,这是因为主线程和t1,t2 线程是同时跑的。但是主进程要等非守护子线程结束之后,主线程才会退出。

阅读全文 »

由于python的GIL锁的存在,导致在多线程的时候,同一时间只能有一个线程在cpu上运行,而且是单个cpu上运行,不管cpu有多少核数。如果要充分利用多核cpu的资源,在python中大部分情况下需要使用多进程。

python多进程模块

Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多也有start(), run(), join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的。

阅读全文 »

列表表达式

1
2
3
info = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
a = [i+1 for i in range(10)]
print(a)

多重循环

1
2
a = [(i,j) for i in range(4) for j in range(2)]
print(a)

多重循环+判断语句

1
2
3
4
5
6
ls = [('x',['open1','open1','open1']),('y',['open1','open1','open0']),('z',['open0','open0','open1'])]

def find(con):
return [j for i in con for j in i if j.isdigit()]

[w for w,con in ls if find(con)==['1','1','1']]
阅读全文 »

什么是装饰器

一个decorator只是一个带有一个函数作为参数并返回一个替换函数的闭包。
简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 未使用装饰器时
def outer(some_func):
def inner():
print ("before some_func")
ret = some_func() # 1
return ret + 1
return inner
def foo():
return 1
decorated = outer(foo)
decorated()

#使用装饰器时
def outer(some_func):
def inner():
print ("before some_func")
ret = some_func() # 1
return ret + 1
return inner
@outer
def foo():
return 1

foo()
阅读全文 »