MyException - 我的异常网
当前位置:我的异常网» Perl/Python » day9-队列queue

day9-队列queue

www.MyException.Cn  网友分享于:2013-09-28  浏览:0次
day9--队列queue

queue队列

    Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递。一个线程放入数据,另外一个线程取数据。

    class queue.Queue(maxsize=0)                                            #先入先出

    class queue.LifoQueue(maxsize=0)                                        #后入先出(Last in first out)

    class queue.PriorityQueue(maxsize=0)                                    #存储数据时可设置优先级的队列

    队列中的方法:

    1.queue.Queue.get()     #获取队列数据,当队列是空的时候,会卡主,等待数据的放入,没有数据放入,会一直阻塞

    get(self, block=True, timeout=None)     #默认状态下,block()如果没有数据是阻塞的

import queue

q = queue.Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
print(q.get())
运行程序:
1
2
阻塞................

 

    队列就是用来存取数据的,当数据取完之后,就会等待新的数据放入,get()就会一直等待,知道数据放入。要想不等待,可以使用下面方法:

    当然,使用get()加上参数block=False也能实现和get_nowait()一样的功能。

    block=True(False)设置当队列是空的时候,是否阻塞,True阻塞,False不阻塞,报错。timeout=None(time)设置阻塞时间,即等待一段时间,如果在这段时间内,没有数据放入,就报错。

    2.get_nowait()          #获取数据,如果队列是空的,则报错  

import queue

q = queue.Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
print(q.get_nowait())
执行结果如下:
1
2
Traceback (most recent call last):
  File "/home/zhuzhu/day9/队列.py", line 8, in <module>
    print(q.get_nowait())
  File "/usr/lib/python3.5/queue.py", line 192, in get_nowait
    return self.get(block=False)
  File "/usr/lib/python3.5/queue.py", line 161, in get
    raise Empty
queue.Empty

    上面使用,get_nowait(),如果队列是空的,则报错,可以用异常来抓取异常,然后可以继续执行。

    3.queue.Queue.qsize()    #判断队列里面元素的个数

import queue

q = queue.Queue()
print(q.qsize())
q.put(1)
print(q.qsize())
q.put(2)
print(q.qsize())
执行结果:
0
1
2

    q.qsize()是判断队列的长度,如果长度为0,说明队列是空的,这个时候使用get()就要注意,程序会阻塞。

    4.q.qut()   #向队列中放入数据

    put(self, item, block=True, timeout=None)

    put()和get()差不多一样,put()当队列满的时候,会报错,block是设置阻塞状态是否开启,timeout是设置阻塞时间,默认一直阻塞。

    5.q.empty(self)     #判断队列是否是空Return True if the queue is empty, False otherwise (not reliable!).

    6.q.full()         #判断队列是否是满的Return True if the queue is full, False otherwise (not reliable!)

    7.put_nowait()  等价于put(block=False)   #如果队列满了,则报错Put an item into the queue without blocking

    下面来看一下LifoQueue,后进先出的情形:

import queue

q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)

print("第一个取出:",q.get())
print("第二个取出:",q.get())

    上面是LifoQueue(maxsize=0)的情形,后进入的先被取出。

    下面来看一下PriorityQueue的情形,有优先级的queue:

import queue

q = queue.PriorityQueue()

q.put((3,"alex"))
q.put((1,"geng"))
q.put((8,"zeng"))

print("第一个取出",q.get())
print("第二个取出:",q.get())
print("第三个取出:",q.get())
执行结果:
第一个取出 (1, 'geng')
第二个取出: (3, 'alex')
第三个取出: (8, 'zeng')

    上面程序中,是有优先级的放入,put((等级,内容)),存放以元组形式放入,前一个是登记,后面一个是消息。用来VIP优先级的情形。

    生产者消费者模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    下面来学习一个最基本的生产者消费者模型的例子:

'''生产者消费者模型就是两个线程,一个生产,另外一个消费,两者相互配合,有交互'''
import queue,time,threading

def producer(name):
    '''定义生产者模型'''
    count = 1                                                    #初始化变量
    while True:
        q.put("骨头%s" %count)                                   #生成骨头
        print("[%s]生成了骨头%s" %(name,count))
        count += 1                                               #每次生产一个
        time.sleep(0.5)                                          #定义产能,生产效率

def consumer(name):
    '''定义消费者模型'''
    while True:
        print("\033[31m[%s] 吃了[%s]\033[0m" %(name,q.get()))
        time.sleep(1)                                             #定义消费效率


if __name__ == "__main__":
    try:
        q = queue.Queue(maxsize=10)                              #初始化一个Queue,并且定义最大容量
        p = threading.Thread(target=producer,args=("geng",))     #初始化生产者线程
        p.start()
    except KeyboardInterrupt as f:
        print("生产者线程断开了!!")

    try:
        c=threading.Thread(target=consumer,args=("alex",))
        c.start()
    except KeyboardInterrupt as e:
        print("线程断开了!!!")
执行结果:
[geng]生成了骨头1
[alex] 吃了[骨头1]
[geng]生成了骨头2
[geng]生成了骨头3
[alex] 吃了[骨头2]
[geng]生成了骨头4
[alex] 吃了[骨头3]
[geng]生成了骨头5
[geng]生成了骨头6
[alex] 吃了[骨头4]
[geng]生成了骨头7
[geng]生成了骨头8
[alex] 吃了[骨头5]
[geng]生成了骨头9
[geng]生成了骨头10
[alex] 吃了[骨头6]
[geng]生成了骨头11
[geng]生成了骨头12
[alex] 吃了[骨头7]
[geng]生成了骨头13
[geng]生成了骨头14

    上面就是生产者和消费者的简单模型,使用了queue(队列),生成者就是生成商品,然后放到队列中;消费者就是去这个队列中根据条件取数,这样不断生产和取数,就是简单的生产者消费者模型,其中time.sleep()是生成效率和消费效率,控制程序的节奏,而count+=1代表消费者生产的能力,每次只生成一个,如果把这个调成10,那么效率就很高,每次生成完成之后,都要等待很久。当然,要调效率,要修改一下代码。

    队列queue的源代码如下:

'''A multi-producer, multi-consumer queue.'''

try:
    import threading
except ImportError:
    import dummy_threading as threading
from collections import deque
from heapq import heappush, heappop
from time import monotonic as time

__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']

class Empty(Exception):
    'Exception raised by Queue.get(block=0)/get_nowait().'
    pass

class Full(Exception):
    'Exception raised by Queue.put(block=0)/put_nowait().'
    pass

class Queue:
    '''Create a queue object with a given maximum size.

    If maxsize is <= 0, the queue size is infinite.
    '''

    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)

        # mutex must be held whenever the queue is mutating.  All methods
        # that acquire mutex must release it before returning.  mutex
        # is shared between the three conditions, so acquiring and
        # releasing the conditions also acquires and releases mutex.
        self.mutex = threading.Lock()

        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
        self.not_empty = threading.Condition(self.mutex)

        # Notify not_full whenever an item is removed from the queue;
        # a thread waiting to put is notified then.
        self.not_full = threading.Condition(self.mutex)

        # Notify all_tasks_done whenever the number of unfinished tasks
        # drops to zero; thread waiting to join() is notified to resume
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

    def task_done(self):
        '''Indicate that a formerly enqueued task is complete.

        Used by Queue consumer threads.  For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items
        have been processed (meaning that a task_done() call was received
        for every item that had been put() into the queue).

        Raises a ValueError if called more times than there were items
        placed in the queue.
        '''
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished

    def join(self):
        '''Blocks until all items in the Queue have been gotten and processed.

        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.

        When the count of unfinished tasks drops to zero, join() unblocks.
        '''
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

    def qsize(self):
        '''Return the approximate size of the queue (not reliable!).'''
        with self.mutex:
            return self._qsize()

    def empty(self):
        '''Return True if the queue is empty, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() == 0
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can grow before the result of empty() or
        qsize() can be used.

        To create code that needs to wait for all queued tasks to be
        completed, the preferred technique is to use the join() method.
        '''
        with self.mutex:
            return not self._qsize()

    def full(self):
        '''Return True if the queue is full, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() >= n
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can shrink before the result of full() or
        qsize() can be used.
        '''
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()

    def put(self, item, block=True, timeout=None):
        '''Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        '''
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def get(self, block=True, timeout=None):
        '''Remove and return an item from the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until an item is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise ('block' is false), return an item if one is immediately
        available, else raise the Empty exception ('timeout' is ignored
        in that case).
        '''
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def put_nowait(self, item):
        '''Put an item into the queue without blocking.

        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
        '''
        return self.put(item, block=False)

    def get_nowait(self):
        '''Remove and return an item from the queue without blocking.

        Only get an item if one is immediately available. Otherwise
        raise the Empty exception.
        '''
        return self.get(block=False)

    # Override these methods to implement other queue organizations
    # (e.g. stack or priority queue).
    # These will only be called with appropriate locks held

    # Initialize the queue representation
    def _init(self, maxsize):
        self.queue = deque()

    def _qsize(self):
        return len(self.queue)

    # Put a new item in the queue
    def _put(self, item):
        self.queue.append(item)

    # Get an item from the queue
    def _get(self):
        return self.queue.popleft()


class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)


class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()

    看看源代码,能够让自己对这些方法,有更好的理解,之后会多看看源代码是如何写的。多参考源代码的写法,里面有很多好的书写习惯和格式。

文章评论

 程序员的样子
程序员的样子
每天工作4小时的程序员
每天工作4小时的程序员
一个程序员的时间管理
一个程序员的时间管理
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
Java程序员必看电影
Java程序员必看电影
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
程序员都该阅读的书
程序员都该阅读的书
我是如何打败拖延症的
我是如何打败拖延症的
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
老程序员的下场
老程序员的下场
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
程序员应该关注的一些事儿
程序员应该关注的一些事儿
我的丈夫是个程序员
我的丈夫是个程序员
代码女神横空出世
代码女神横空出世
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
编程语言是女人
编程语言是女人
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
程序员的鄙视链
程序员的鄙视链
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
鲜为人知的编程真相
鲜为人知的编程真相
为什么程序员都是夜猫子
为什么程序员都是夜猫子
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
程序员必看的十大电影
程序员必看的十大电影
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
如何成为一名黑客
如何成为一名黑客
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
中美印日四国程序员比较
中美印日四国程序员比较
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
程序员和编码员之间的区别
程序员和编码员之间的区别
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有