一种新奇的线程队列轮询方法

利用隐藏的环回(loopback)网络连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import queue
import socket
import os
class PollableQueue(queue.Queue):
# 定义一种新的Queue,底层有一对互联的socket
def __init__(self):
super().__init__()
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# non-POSIX 系统
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
# 创建一个服务器socket,之后立刻创建客户端socket并连接到服务器上
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
# 返回套接字的文件描述符
return self._getsocket.fileno()
def put(self, item):
super().put(item)
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return super().get()
if __name__ == '__main__':
import select
import threading
import time
def consumer(queues):
while True:
# 使用select轮询
can_read, _, _ = select.select(queues,[],[])
for r in can_read:
item = r.get()
print('Got:', item)
q1 = PollableQueue()
q2 = PollableQueue()
q3 = PollableQueue()
t = threading.Thread(target=consumer, args=([q1,q2,q3],))
t.daemon = True
t.start()
q1.put(1)
q2.put(10)
q3.put('hello')
q2.put(15)
time.sleep(1)

尽管底层的IO会带来一点点负载。但是
如果不采用这样的socket技术,另一个选择就是遍历所有的队列,分辨每个队列是否为空,还要用上定时器。
但是这样不仅麻烦,如果跟其他轮询对象比如socket一起使用的话。不能在同一地位上。

每天一道编程题——优雅的点

小易有一个圆心在坐标原点的圆,小易知道圆的半径的平方。小易认为在圆上的点而且横纵坐标都是整数的点是优雅的,小易现在想寻找一个算法计算出优雅的点的个数,请你来帮帮他。
例如:半径的平方如果为25
优雅的点就有:(+/-3, +/-4), (+/-4, +/-3), (0, +/-5) (+/-5, 0),一共12个点。

输入描述:
输入为一个整数,即为圆半径的平方,范围在32位int范围内。
输出描述:
输出为一个整数,即为优雅的点的个数

输入例子:
25
输出例子:
12

1
2
3
4
5
6
7
8
from math import sqrt
def func(r2):
count,r=0,sqrt(r2)
for i in range(int(r)+1):
t_=sqrt(r2-i**2)
if round(t_)==t_:
count+=2 if i==0 or i==r else 4
return count


来自网易2017秋招

浅谈Python中的描述符(Descriptor)

简单点

1
2
3
4
5
6
7
8
class Descriptor:
def __get__(self, instance, owner):
print('__get__ called. '
class foo:
t = Descriptor()
if __name__ == '__main__':
c = foo()
c.t #print __get__ called.

详细点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import types
class Descriptor:
def __call__(self, *args, **kwargs):
print('Descriptor called')
def __get__(self, instance, owner):
if instance is not None:
print('__get__ called. ' + 'instance is ' + str(instance) + '; owner is ' + str(owner))
return types.MethodType(self, instance)
else:
print('__get__ called. ' + 'instance is ' + str(instance) + '; owner is ' + str(owner))
return self
def __set__(self, instance, value):
print('set ' + str(instance) + ';value: ' + str(value))
class foo:
t = Descriptor()
class bar:
def __init__(self):
self.t=Descriptor()
if __name__ == '__main__':
c = foo()
print('test 1')
# 相当于foo.__dict__['t'].__get__(None, foo)
foo.t
print('test 2')
# 相当于foo.__dict__['t'].__get__(t, foo)
c.t
print('test 3')
c.t()
print('test 4')
c.t = 1
print('test 5')
d=bar()
d.t
'''
test 1
__get__ called. instance is None; owner is <class '__main__.foo'>
test 2
__get__ called. instance is <__main__.foo object at 0x00000273839F4320>; owner is <class '__main__.foo'>
test 3
__get__ called. instance is <__main__.foo object at 0x00000273839F4320>; owner is <class '__main__.foo'>
Descriptor called
test 4
set <__main__.foo object at 0x00000273839F4320>;value: 1
test 5
'''

有什么用

如果想创建一个新形式的实例属性,可以以描述符类的形式定义其功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Integer:
def __init__(self, name):
self.name = name
def __get__(self, instance, cls):
if instance is None:
return self
else:
return instance.__dict__[self.name]
def __set__(self, instance, value):
if not isinstance(value, int):
raise TypeError('Expected an int')
instance.__dict__[self.name] = value
def __delete__(self, instance):
del instance.__dict__[self.name]
class Point:
x = Integer('x')
y = Integer('y')
def __init__(self, x, y):
self.x = x
self.y = y
if __name__ == '__main__':
p = Point(2, 3)
print(p.x) # calls Point.x.__get__(p,Point)
p.y = 5
try:
p.x = 2.3
except TypeError as e:
print(e)

如果仅仅是像访问某个特定类中的一个属性,并对此做定制化处理,直接用property更加简单。
在需要大量重用代码的情况下,描述符更加有用。

利用Python中的元类实现缓存实例(cached instance)

关于缓存实例(cached instance),可以参考Python中的弱引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import weakref
class Cached(type):
def __init__(self, *args, **kwargs):
super(Cached, self).__init__(*args, **kwargs)
# 建立一个value为弱引用的对象
self.__cache = weakref.WeakValueDictionary()
def __call__(self, *args):
if args in self.__cache:
return self.__cache[args]
else:
# 注意,这里的self为Spam.Spam已经通过元类创建了__cache对象
obj = super(Cached, self).__call__(*args)
self.__cache[args] = obj
return obj
class Spam(metaclass=Cached):
def __init__(self, name):
print('Creating Spam({!r})'.format(name))
self.name = name
if __name__ == '__main__':
a = Spam('foo')
b = Spam('bar')
print('a is b:', a is b)
c = Spam('foo')
print('a is c:', a is c)

利用Python中的元类实现单例模式


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Singleton(type):
def __init__(self, *args, **kwargs):
self.__instance = None
# 如果是Python2
# super(Singleton,self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
if self.__instance is None:
self.__instance = super().__call__(*args, **kwargs)
return self.__instance
else:
return self.__instance
class Spam(metaclass=Singleton):
# 如果是Python2
# __metaclass__=Singleton
def __init__(self):
print('Creating Spam')
if __name__ == '__main__':
a = Spam()
b = Spam()
print(a is b)

Python中的弱引用

weakref模块具有的方法

class weakref.ref(object[, callback])
创建一个弱引用对象,object是被引用的对象,callback是回调函数(当被引用对象被删除时的,会调用改函数)。

weakref.proxy(object[, callback])
创建一个用弱引用实现的代理对象,参数同上。

weakref.getweakrefcount(object)
获取对象object关联的弱引用对象数

weakref.getweakrefs(object)
获取object关联的弱引用对象列表

class weakref.WeakKeyDictionary([dict])
创建key为弱引用对象的字典

class weakref.WeakValueDictionary([dict])
创建value为弱引用对象的字典

class weakref.WeakSet([elements])
创建成员为弱引用对象的集合对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
### 创建弱引用
>>> from socket import *
>>> import weakref
>>> s=socket(AF_INET,SOCK_STREAM)
>>> ref=weakref.ref(s)
>>> s
<socket._socketobject instance at 007B4A94>
>>> ref
<weakref at 0x81195c; to 'instance' at 0x7b4a94>
>>> ref() #调用它来访问被引用的对象
<socket.socketobject instance at 007B4A94>
一旦没有了对这个对象的其它的引用,调用弱引用将返回None,因为Python已经销毁了这个对象。 注意:大部分的对象不能通过弱引用来访问。
### 创建代理对象
>>> from socket import*
>>> import weakref
>>> s=socket(AF_INET,SOCK_STREAM)
>>> ref=weakref.proxy(s)
>>> s
<socket._socketobject instance at 007E4874>
>>> ref
<socket._socketobject instance at 007E4874>
>>> ref.close() #对象的方法同样工作

例子

创建缓存实例
当创建类实例时,我们想反悔一个缓存引用,让其指向上一个用同样参数(如果有的话)创建出的类实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sys
class Spam:
def __init__(self, name):
self.name = name
import weakref
_spam_cache = weakref.WeakValueDictionary()
def get_spam(name):
if name not in _spam_cache:
s = Spam(name)
_spam_cache[name] = s
else:
s = _spam_cache[name]
return s
if __name__ == '__main__':
a = get_spam('foo')
b = get_spam('bar')
print(sys.getrefcount(a))
print('a is b:', a is b)
c = get_spam('foo')
print(sys.getrefcount(a))
print('a is c:', a is c)
# 2
# a is b: False
# 3
# a is c: True
print(list(_spam_cache))
del a
print(list(_spam_cache))
del c
print(list(_spam_cache))
# ['foo', 'bar']
# ['foo', 'bar']
# ['bar']

封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import weakref
class CachedSpamManager:
def __init__(self):
self._cache = weakref.WeakValueDictionary()
def get_spam(self, name):
if name not in self._cache:
s = Spam(name)
self._cache[name] = s
else:
s = self._cache[name]
return s
class Spam:
def __init__(self, name):
self.name = name
Spam.manager = CachedSpamManager()
def get_spam(name):
return Spam.manager.get_spam(name)
if __name__ == '__main__':
a = get_spam('foo')
b = get_spam('bar')
print('a is b:', a is b)
c = get_spam('foo')
print('a is c:', a is c)

实现带有状态的对象


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 一个naive的例子
class Connection:
def __init__(self):
self.state = 'CLOSED'
def read(self):
if self.state != 'OPEN':
raise RuntimeError('Not open')
print('reading')
def write(self):
if self.state != 'OPEN':
raise RuntimeError('Not open')
print('writing')
def open(self):
if self.state == 'OPEN':
raise RuntimeError('Already open')
self.state = 'OPEN'
def close(self):
if self.state == 'CLOSED':
raise RuntimeError('Already CLOSED')
self.state = 'CLOSED'

这样很不优雅,而且包含了大量重复的代码。
我们的目的是进行状态判断,那么我们本就可以把OPEN跟CLOSE状态各自封装成一个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Connection state 的基类
class ConnectionState:
@staticmethod
def read(conn):
raise NotImplementedError()
@staticmethod
def write(conn, data):
raise NotImplementedError()
@staticmethod
def open(conn):
raise NotImplementedError()
@staticmethod
def close(conn):
raise NotImplementedError()
# 不同状态
class ClosedConnectionState(ConnectionState):
@staticmethod
def read(conn):
raise RuntimeError('Not open')
@staticmethod
def write(conn, data):
raise RuntimeError('Not open')
@staticmethod
def open(conn):
conn.new_state(OpenConnectionState)
@staticmethod
def close(conn):
raise RuntimeError('Already closed')
class OpenConnectionState(ConnectionState):
@staticmethod
def read(conn):
print('reading')
@staticmethod
def write(conn, data):
print('writing')
@staticmethod
def open(conn):
raise RuntimeError('Already open')
@staticmethod
def close(conn):
conn.new_state(ClosedConnectionState)
class Connection:
def __init__(self):
self.new_state(ClosedConnectionState)
def new_state(self, newstate):
self._state = newstate
def read(self):
return self._state.read(self)
def write(self, data):
return self._state.write(self, data)
def open(self):
return self._state.open(self)
def close(self):
return self._state.close(self)
if __name__ == '__main__':
c = Connection()
print(c)
try:
c.read()
except RuntimeError as e:
print(e)
c.open()
print(c)
c.read()
c.close()
print(c)

这里的每种状态都用类和静态方法来实现,在每个静态方法中都把Connection类的实例作为第一个参数。这是因为我们在不同的状态类中不保存任何实例数据,所有的实例数据都应该保存在Connection实例中。
再看看第二种方法,直接修改实例的class属性
这里不再将Connection和COnnectionState作为单独的类来实现,现在我们将他们合并在一起。随着状态的改变,实例也会修改自己的类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class Connection:
def __init__(self):
self.new_state(ClosedConnection)
def new_state(self, state):
self.__class__ = state
def read(self):
raise NotImplementedError()
def write(self, data):
raise NotImplementedError()
def open(self):
raise NotImplementedError()
def close(self):
raise NotImplementedError()
class ClosedConnection(Connection):
def read(self):
raise RuntimeError('Not open')
def write(self, data):
raise RuntimeError('Not open')
def open(self):
self.new_state(OpenConnection)
def close(self):
raise RuntimeError('Already closed')
class OpenConnection(Connection):
def read(self):
print('reading')
def write(self, data):
print('writing')
def open(self):
raise RuntimeError('Already open')
def close(self):
self.new_state(ClosedConnection)
if __name__ == '__main__':
c = Connection()
print(c)
try:
c.read()
except RuntimeError as e:
print(e)
c.open()
print(c)
c.read()
c.close()
print(c)

聪明的同学可能会看出,这TM就是设计模式中的状态模式嘛。

在回调函数中携带额外的状态


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def apply_async(func, args, *, callback):
# 执行函数
result = func(*args)
# 回调函数介入
callback(result)
# 例子
def add(x, y):
return x + y
# (a) 一个简单的回调函数例子
print('# --- Simple Example')
def print_result(result):
print("Got:", result)
apply_async(add, (2, 3), callback=print_result)
apply_async(add, ('hello', 'world'), callback=print_result)
# (b) 使用绑定方法 (bound method)
print('# --- Using a bound-method')
class ResultHandler:
def __init__(self):
self.sequence = 0
def handler(self, result):
self.sequence += 1
print('[{}] Got: {}'.format(self.sequence, result))
r = ResultHandler()
apply_async(add, (2, 3), callback=r.handler)
apply_async(add, ('hello', 'world'), callback=r.handler)
# (c) 使用闭包
print('# --- Using a closure')
def make_handler():
sequence = 0
def handler(result):
# python3里面的nonlocal
nonlocal sequence
sequence += 1
print('[{}] Got: {}'.format(sequence, result))
return handler
handler = make_handler()
apply_async(add, (2, 3), callback=handler)
apply_async(add, ('hello', 'world'), callback=handler)
# (d) 使用协程
print('# --- Using a coroutine')
def make_handler():
sequence = 0
while True:
result = yield
sequence += 1
print('[{}] Got: {}'.format(sequence, result))
handler = make_handler()
next(handler) # 初始化
apply_async(add, (2, 3), callback=handler.send)
apply_async(add, ('hello', 'world'), callback=handler.send)
# (e) 使用partial
print('# --- Using partial')
class SequenceNo:
def __init__(self):
self.sequence = 0
def handler(result, seq):
seq.sequence += 1
print('[{}] Got: {}'.format(seq.sequence, result))
seq = SequenceNo()
from functools import partial
apply_async(add, (2, 3), callback=partial(handler, seq=seq))
apply_async(add, ('hello', 'world'), callback=partial(handler, seq=seq))

Python闭包的奇技淫巧

这种奇技淫巧,很怪异。相比一个真正的类,像继承,属性,描述符,或者类方法这样的特性在这种方法中是无法使用的。
从全局的角度考虑, 为闭包增加方法可能会有更多的实际用途,比如我们想重置内部状态,刷新缓冲区,清除缓存或者实现某种形式的反馈机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# coding:utf-8
import sys
class ClosureInstance:
def __init__(self, locals=None):
if locals is None:
# 获得上一层
locals = sys._getframe(1).f_locals
# 遇到可调用的(函数),更新实例的字典。
self.__dict__.update((key, value) for key, value in locals.items()
if callable(value))
print self.__dict__
# 重定向len方法
def __len__(self):
!['__len__'](./Python闭包的奇技淫巧)
def Stack():
items = []
def push(item):
items.append(item)
def pop():
return items.pop()
def __len__():
return len(items)
return ClosureInstance()
if __name__ == '__main__':
s = Stack()
print(s)
s.push(10)
s.push(20)
s.push('Hello')
print(len(s))
print(s.pop())
print(s.pop())
print(s.pop()

学习OpenCV——Python:用OpenCV处理图像

高通滤波器(HPF)

突出一个像素跟周边像素的差异,突出一个图像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# coding:utf-8
import cv2
import numpy as np
from scipy import ndimage
kernel_3x3 = np.array(
[[-1, -1, -1],
[-1, 8, -1],
[-1, -1, -1]]
)
kernel_5x5 = np.array(
[[-1, -1, -1, -1, -1],
[-1, 1, 2, 1, -1],
[-1, 2, 4, 2, -1],
[-1, 1, 2, 1, -1],
[-1, -1, -1, -1, -1]]
)
# 灰度图
img = cv2.imread("0.jpg", 0)
# “convolve”大概翻译为卷积的意思吧
# 通过scipy的ndimage实现
k3 = ndimage.convolve(img, kernel_3x3)
k5 = ndimage.convolve(img, kernel_5x5)
blurred = cv2.GaussianBlur(img, (11,11), 0)
g_hpf = img - blurred
cv2.imshow("3x3", k3)
cv2.imshow("5x5", k5)
cv2.imshow("gw('img',img)_hpf", g_hpf)
cv2.waitKey()
cv2.destroyAllWindows()




可见g_hpf效果最好
### 低通滤波器(LPF)
smoothen一个像素,如果它比某个阀值低。
这在降噪跟模糊中很有用。
如高斯模糊就是一个减弱高频信号的低通滤波器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import cv2
import numpy as np
'''
边缘检测
OpenCV内置的边缘检测:Laplacian(), Sobel(), and Scharr()
当然,边缘检测很容易把噪声当做边缘。所以我们在进行边缘检测之前先进行降噪模糊。
OpenCV内置的模糊函数blur() (simple average), medianBlur(), and GaussianBlur()
下面我们使用medianBlur(),这对于彩色图像的降噪帮助很大。对于边缘检测,我们使用拉普拉斯算子Laplacian()
在降噪之后,我们把色彩空间从BGR转换成灰阶
'''
def strokeEdges(src, dst, blurKsize=7, edgeKsize=5):
if blurKsize >= 3:
# 7*7的模板
blurredSrc = cv2.medianBlur(src, blurKsize)
# BGR to GRAY
graySrc = cv2.cvtColor(blurredSrc, cv2.COLOR_BGR2GRAY)
else:
graySrc = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)
# 边缘检测
cv2.Laplacian(graySrc, cv2.CV_8U, graySrc, ksize=edgeKsize)
# 归一化
normalizedInverseAlpha = (1.0 / 255) * (255 - graySrc)
# 分割合并
channels = cv2.split(src)
for channel in channels:
channel[:] = channel * normalizedInverseAlpha
cv2.merge(channels, dst)




自定义模板

OpenCV提供了一个filter2D()函数,供我们自定义模板
我们再来看看模板,它是一个有奇数行跟列的二维数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import cv2
import numpy
'''
边缘检测
OpenCV内置的边缘检测:Laplacian(), Sobel(), and Scharr()
当然,边缘检测很容易把噪声当做边缘。所以我们在进行边缘检测之前先进行降噪模糊。
OpenCV内置的模糊函数blur() (simple average), medianBlur(), and GaussianBlur()
下面我们使用medianBlur(),这对于彩色图像的降噪帮助很大。对于边缘检测,我们使用拉普拉斯算子Laplacian()
在降噪之后,我们把色彩空间从BGR转换成灰阶
'''
def strokeEdges(src, dst, blurKsize=7, edgeKsize=5):
if blurKsize >= 3:
# 7*7的模板
blurredSrc = cv2.medianBlur(src, blurKsize)
# BGR to GRAY
graySrc = cv2.cvtColor(blurredSrc, cv2.COLOR_BGR2GRAY)
else:
graySrc = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)
# 边缘检测
cv2.Laplacian(graySrc, cv2.CV_8U, graySrc, ksize=edgeKsize)
# 归一化
normalizedInverseAlpha = (1.0 / 255) * (255 - graySrc)
# 分割合并
channels = cv2.split(src)
for channel in channels:
channel[:] = channel * normalizedInverseAlpha
cv2.merge(channels, dst)
class VConvolutionFilter(object):
def __init__(self, kernel):
self._kernel = kernel
def apply(self, src, dst):
"""应用滤波器"""
# 第二个参数 -1,指的是目标图跟原图具有同样的通道深度(per-channel depth)
cv2.filter2D(src, -1, self._kernel, dst)
class SharpenFilter(VConvolutionFilter):
"""锐化"""
def __init__(self):
# 注意到模板权重总和为1。这种情况下,图像的总体亮度不变。
kernel = numpy.array(
[[-1, -1, -1],
[-1, 9, -1],
[-1, -1, -1]]
)
VConvolutionFilter.__init__(self, kernel)
class FindEdgesFilter(VConvolutionFilter):
"""边缘检测"""
def __init__(self):
# 权重总和0,边缘白色,不是边缘黑色
kernel = numpy.array(
[[-1, -1, -1],
[-1, 8, -1],
[-1, -1, -1]]
)
VConvolutionFilter.__init__(self, kernel)
class BlurFilter(VConvolutionFilter):
"""模糊"""
def __init__(self):
# 权重总和为1
kernel = numpy.array(
[[0.04, 0.04, 0.04, 0.04, 0.04],
[0.04, 0.04, 0.04, 0.04, 0.04],
[0.04, 0.04, 0.04, 0.04, 0.04],
[0.04, 0.04, 0.04, 0.04, 0.04],
[0.04, 0.04, 0.04, 0.04, 0.04]]
)
VConvolutionFilter.__init__(self, kernel)
class EmbossFilter(VConvolutionFilter):
"""浮雕"""
def __init__(self):
# 绝大多数模板都是对称的,若一边模糊(取正),一边锐化(取负),将会有奇异的效果(浮雕)
kernel = numpy.array(
[[-2, -1, 0],
[-1, 1, 1],
[0, 1, 2]]
)
VConvolutionFilter.__init__(self, kernel)

SharpenFilter



##### FindEdgesFilter



##### BlurFilter



##### EmbossFilter



### 实战
还记得我们之前的那个调用摄像头的实战吗?这次我们在前一次的基础上,加上滤波器看看。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
filter.py
import cv2
import numpy
'''
边缘检测
OpenCV内置的边缘检测:Laplacian(), Sobel(), and Scharr()
当然,边缘检测很容易把噪声当做边缘。所以我们在进行边缘检测之前先进行降噪模糊。
OpenCV内置的模糊函数blur() (simple average), medianBlur(), and GaussianBlur()
下面我们使用medianBlur(),这对于彩色图像的降噪帮助很大。对于边缘检测,我们使用拉普拉斯算子Laplacian()
在降噪之后,我们把色彩空间从BGR转换成灰阶
'''
def strokeEdges(src, dst, blurKsize=7, edgeKsize=5):
if blurKsize >= 3:
# 7*7的模板
blurredSrc = cv2.medianBlur(src, blurKsize)
# BGR to GRAY
graySrc = cv2.cvtColor(blurredSrc, cv2.COLOR_BGR2GRAY)
else:
graySrc = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)
# 边缘检测
cv2.Laplacian(graySrc, cv2.CV_8U, graySrc, ksize=edgeKsize)
# 归一化
normalizedInverseAlpha = (1.0 / 255) * (255 - graySrc)
# 分割合并
channels = cv2.split(src)
for channel in channels:
channel[:] = channel * normalizedInverseAlpha
cv2.merge(channels, dst)
class VConvolutionFilter(object):
def __init__(self, kernel):
self._kernel = kernel
def apply(self, src, dst):
"""应用滤波器"""
# 第二个参数 -1,指的是目标图跟原图具有同样的通道深度(per-channel depth)
cv2.filter2D(src, -1, self._kernel, dst)
class SharpenFilter(VConvolutionFilter):
"""锐化"""
def __init__(self):
# 注意到模板权重总和为1。这种情况下,图像的总体亮度不变。
kernel = numpy.array(
[[-1, -1, -1],
[-1, 9, -1],
[-1, -1, -1]]
)
VConvolutionFilter.__init__(self, kernel)
class FindEdgesFilter(VConvolutionFilter):
"""边缘检测"""
def __init__(self):
# 权重总和0,边缘白色,不是边缘黑色
kernel = numpy.array(
[[-1, -1, -1],
[-1, 8, -1],
[-1, -1, -1]]
)
VConvolutionFilter.__init__(self, kernel)
class BlurFilter(VConvolutionFilter):
"""模糊"""
def __init__(self):
# 权重总和为1
kernel = numpy.array(
[[0.04, 0.04, 0.04, 0.04, 0.04],
[0.04, 0.04, 0.04, 0.04, 0.04],
[0.04, 0.04, 0.04, 0.04, 0.04],
[0.04, 0.04, 0.04, 0.04, 0.04],
[0.04, 0.04, 0.04, 0.04, 0.04]]
)
VConvolutionFilter.__init__(self, kernel)
class EmbossFilter(VConvolutionFilter):
"""浮雕"""
def __init__(self):
# 绝大多数模板都是对称的,若一边模糊(取正),一边锐化(取负),将会有奇异的效果(浮雕)
kernel = numpy.array(
[[-2, -1, 0],
[-1, 1, 1],
[0, 1, 2]]
)
VConvolutionFilter.__init__(self, kernel)


#### cameo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# coding:utf-8
import time
import cv2
import numpy
import filters
class CaptureManager(object):
def __init__(self, capture, previewWindowManager=None, shouldMirrorPreview=False):
# 绘制窗口,bool
self.previewWindowManager = previewWindowManager
# 镜像旋转(在窗口中问不是在文件中),bool
self.shouldMirrorPreview = shouldMirrorPreview
self._capture = capture
# 频道
self._channel = 0
self._enteredFrame = False
self._frame = None
# 写入图像
self._imageFilename = None
self._videoFilename = None
self._videoEncoding = None
self._videoWriter = None
self._startTime = None
# 从开始到现在帧数
self._framesElapsed = long(0)
# OpenCV没办法获取FPS,如果需要可以用time.time()计算
self._fpsEstimate = None
@property
def channel(self):
return self._channel
@channel.setter
def channel(self, value):
if self._channel != value:
self._channel = value
self._frame = None
@property
def frame(self):
if self._enteredFrame and self._frame is None:
_, self._frame = self._capture.retrieve()
return self._frame
@property
def isWritingImage(self):
return self._imageFilename is not None
@property
def isWritingVideo(self):
return self._videoFilename is not None
def enterFrame(self):
"""捕获下一帧,如果有的话"""
assert not self._enteredFrame, \
'previous enterFrame() had no matching exitFrame()'
if self._capture is not None:
self._enteredFrame = self._capture.grab()
def exitFrame(self):
"""绘制窗口. 写入文件. 释放."""
if self.frame is None:
self._enteredFrame = False
return
# 获取FPS
if self._framesElapsed == 0:
self._startTime = time.time()
else:
timeElapsed = time.time() - self._startTime
self._fpsEstimate = self._framesElapsed / timeElapsed
self._framesElapsed += 1
# 绘制窗口
if self.previewWindowManager is not None:
if self.shouldMirrorPreview:
mirroredFrame = numpy.fliplr(self._frame).copy()
self.previewWindowManager.show(mirroredFrame)
else:
self.previewWindowManager.show(self._frame)
# 写入图像
if self.isWritingImage:
cv2.imwrite(self._imageFilename, self._frame)
self._imageFilename = None
# 写入视频
self._writeVideoFrame()
# 释放
self._frame = None
self._enteredFrame = False
def writeImage(self, filename):
"""写入一帧到图像"""
self._imageFilename = filename
def startWritingVideo(self, filename, encoding=cv2.VideoWriter_fourcc('I', '4', '2', '0')):
"""开始准备写入视频"""
self._videoFilename = filename
self._videoEncoding = encoding
def stopWritingVideo(self):
"""停止写入视频"""
self._videoFilename = None
self._videoEncoding = None
self._videoWriter = None
def _writeVideoFrame(self):
if not self.isWritingVideo:
return
if self._videoWriter is None:
fps = self._capture.get(cv2.CAP_PROP_FPS)
if fps == 0.0:
# 不能捕获帧数就用之前自己测量的
if self._framesElapsed < 20:
# 等待帧数稳定下来
return
else:
fps = self._fpsEstimate
size = (int(self._capture.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self._capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
self._videoWriter = cv2.VideoWriter(self._videoFilename, self._videoEncoding, fps, size)
self._videoWriter.write(self._frame)
class WindowManager(object):
def __init__(self, windowName, keypressCallback=None):
self.keypressCallback = keypressCallback
self._windowName = windowName
self._isWindowCreated = False
@property
def isWindowCreated(self):
return self._isWindowCreated
def createWindow(self):
cv2.namedWindow(self._windowName)
self._isWindowCreated = True
def show(self, frame):
cv2.imshow(self._windowName, frame)
def destroyWindow(self):
cv2.destroyWindow(self._windowName)
self._isWindowCreated = False
def processEvents(self):
keycode = cv2.waitKey(1)
if self.keypressCallback is not None and keycode != -1:
keycode &= 0xFF
self.keypressCallback(keycode)
class Cameo(object):
def __init__(self):
self._windowManager = WindowManager('Cameo', self.onKeypress)
self._captureManager = CaptureManager(cv2.VideoCapture(1), self._windowManager, True)
self._curveFilter = filters.EmbossFilter()
def run(self):
"""开始循环"""
self._windowManager.createWindow()
while self._windowManager.isWindowCreated:
self._captureManager.enterFrame()
frame = self._captureManager.frame
self._curveFilter.apply(frame, frame)
self._captureManager.exitFrame()
self._windowManager.processEvents()
def onKeypress(self, keycode):
"""
处理案件
space -> 截屏
tab -> 开始/停止录像
escape -> 退出
"""
if keycode == 32: # space
self._captureManager.writeImage('screenshot.png')
elif keycode == 9: # tab
if not self._captureManager.isWritingVideo:
self._captureManager.startWritingVideo('screencast.avi')
else:
self._captureManager.stopWritingVideo()
elif keycode == 27: # escape
self._windowManager.destroyWindow()
if __name__ == "__main__":
Cameo().run()




|