data:image/s3,"s3://crabby-images/7f2a5/7f2a58ecd1449de4f254fa03bd0d6294a84e7f90" alt="Python lock queue"
For each get()used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. Indicate that a formerly enqueued task is complete.
Python lock queue code#
If you observe the code of the process_itemsmethod, I added this line at the end of the 1-second processing code simulation sleep: The consumer thread was created using an infinite loop, and we must correct this, so the processing thread stops properly when all work is complete.īefore doing that, I want to introduce the concept of task. All the items have been produced and consumed.
Python lock queue full#
The producer thread was able to insert the first 5 element quite fast (~10 items per second), but from then on, it had to settle for the pace fixed by the consumer thread (1 item per second), as the queue has been kept full at all times. You will see that, with a consumer thread available for item extraction and processing, the producer thread has been able to go beyond the 5 th insertion.
data:image/s3,"s3://crabby-images/9dd28/9dd28c6a26e1aa26979419a6a1cefa74142cd77a" alt="python lock queue python lock queue"
Process_items - Processing queue size: 0. Process_items - Processing queue size: 1. Process_items - Processing queue size: 2. Process_items - Processing queue size: 3. Process_items - Processing queue size: 4. Process_items - Processing queue size: 5. Remaining tasks: 5Īdd_items - Processing queue size: 5. Remaining tasks: 4Īdd_items - Processing queue size: 4. Remaining tasks: 3Īdd_items - Processing queue size: 3. Remaining tasks: 2Īdd_items - Processing queue size: 2. Remaining tasks: 1Īdd_items - Processing queue size: 1. In my computer, the execution results in this sequence of logs:Īdd_items - Processing queue size: 0. T_process = threading.Thread(target=process_items, args=(test_queue,), # insertion thread t_add = threading.Thread(target=add_items,Īrgs=(test_queue, 10), name= " add_items_thread") format(processing_queue.qsize(), processing_queue.unfinished_tasks)) def main():
data:image/s3,"s3://crabby-images/4b987/4b9873e6bce7db6ca11cadde40d51e04ac2c68d1" alt="python lock queue python lock queue"
Print( " add_items - Processing queue size: ". I will improve this point in a later step, to add some way to stop this thread in a controlled manner.ĭef add_items(processing_queue, num_items):
data:image/s3,"s3://crabby-images/4a954/4a954a36737878c5d74986b7826818ebe810d0db" alt="python lock queue python lock queue"
The processing method includes an infinite loop ( while True), to make sure that the thread continues extracting elements when possible. We will simulate the time consumed processing the elements executing performing a sleep on the processing thread.
data:image/s3,"s3://crabby-images/84721/847216bbd9810d74e9fd21fb5b8e4580c1571c4c" alt="python lock queue python lock queue"
I will follow my own recommendations on this in a later article, for the sake of simplicity. My recommendation is to use blocking for the get, and provide the call with an appropriate timeout (in seconds), to avoid issues with the uninterruptible wait. In this case, the exception raised is Queue.Empty, when no element can be extracted from the queue.
data:image/s3,"s3://crabby-images/7f2a5/7f2a58ecd1449de4f254fa03bd0d6294a84e7f90" alt="Python lock queue"