佇列(Queue)

佇列(Queue)

Author: Jed. Feb 06, 2025

Queue介紹

Queue也是一個重要的線性資料結構之一,Insert資料至Queue稱為enqueue,從Queue讀取或取出稱為dequeue,特色是先進先出(FIFO)

例子-排隊

ABCD在排隊買飲料,我們可以從圖中看到D先到所以買完飲料後就走,這就是先進先出(FIFO)特性。

graph LR A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]

enqueue

顧名思義就是將資料加入Queue,以下為排隊為例

graph LR A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]

假設有個a要來排飲料這時Queue變化就會像這樣

graph LR a["a"] --- A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]

後面如果又來個b要排飲料一樣接續在a後面,以此類推

graph LR b["b"] --- a["a"] --- A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]

dequeue

其實也很簡單,就是買完飲料後離開,延續上一個例子

graph LR b["b"] --- a["a"] --- A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]

D買完後離開變動

graph LR b["b"] --- a["a"] --- A["A"] --- B["B"] --- C["C"] --- E["飲料店櫃台"]

實作Queue的兩種資料結構方式

python 有提供Queuemodule,如果是自己實作透過ArrayLinked List則需要知道以下重點

Array實作考量

  • Dequeue操作的時間複雜度為 O(n),因為需要移動所有元素。
  • 需要考量Resize,如果陣列滿了,需要擴展陣列(動態陣列),這會導致額外的開銷。

Linked List實作考量

  • 需要額外的空間存儲指針。
  • 因為是順序讀取Queue,因此取得Queue大小時間複雜度為O(n)

ArrayLinked List 時間複雜度

操作 使用 Array 的時間複雜度 使用 Linked List 的時間複雜度
enqueue O(1) O(1)
dequeue O(n) O(1)
查看queue header O(1) O(1)
檢查queue是否為空 O(1) O(1)
獲取queue大小 O(1) O(n)

Queue的應用- 緩衝區(Buffer)

我們可以模擬iOT裝置上船數據至伺服器,並且及時統計數據,首先模擬感測器上傳,先建立Queue當我們的數據緩衝區,接著加上threading module 模擬iOT設備不定時上傳數據至伺服器的Queue

import threading
import queue
import random
import time

sensor_data_queue = queue.Queue()

# 模擬 3 種不同 IoT 設備
SENSOR_TYPES = ["temperature", "humidity", "pressure"]
NUM_DEVICES = 3
threads = []

def simulate_iot_device(device_id):
    """模擬 IoT 設備,定期產生感測數據"""
    while True:
        sensor_type = random.choice(SENSOR_TYPES)
        value = round(random.uniform(20, 100), 2)
        timestamp = time.time()

        data = {"device_id": device_id, "type": sensor_type, "value": value, "timestamp": timestamp}

        sensor_data_queue.put(data)
        print(f"[IoT {device_id}] 上傳數據: {data}")

        time.sleep(random.uniform(1, 3))  # 模擬不定時數據上傳
for i in range(NUM_DEVICES):
    thread = threading.Thread(target=simulate_iot_device, args=(i,))
    thread.daemon = True
    thread.start()
    threads.append(thread)

接著在寫計算感測器上傳及時統計數據

def process_sensor_data():
    """從 Queue 取出數據並計算統計資訊"""
    stats = {"temperature": [], "humidity": [], "pressure": []}

    while True:
        if not sensor_data_queue.empty():
            data = sensor_data_queue.get()
            print(sensor_data_queue.qsize()) # 列出queue目前裡面數據還剩多少沒處理
            sensor_type = data["type"]
            stats[sensor_type].append(data["value"])

            # 計算即時統計數據
            avg = round(sum(stats[sensor_type]) / len(stats[sensor_type]), 2)
            max_val = max(stats[sensor_type])
            min_val = min(stats[sensor_type])

            print(f"[伺服器] {sensor_type.upper()} 數據分析 - 平均值: {avg}, 最大值: {max_val}, 最小值: {min_val}")

        time.sleep(1)

server_thread = threading.Thread(target=process_sensor_data, daemon=True)
server_thread.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("\n[主程序] 停止 IoT 模擬與數據處理。")

完整程式碼

import threading
import queue
import random
import time

sensor_data_queue = queue.Queue()

SENSOR_TYPES = ["temperature", "humidity", "pressure"]

def simulate_iot_device(device_id):
    """模擬 IoT 設備,定期產生感測數據"""
    while True:
        sensor_type = random.choice(SENSOR_TYPES)
        value = round(random.uniform(20, 100), 2)
        timestamp = time.time()

        data = {"device_id": device_id, "type": sensor_type, "value": value, "timestamp": timestamp}

        sensor_data_queue.put(data)
        print(f"[IoT {device_id}] 上傳數據: {data}")

        time.sleep(random.uniform(1, 3))

def process_sensor_data():
    """從 Queue 取出數據並計算統計資訊"""
    stats = {"temperature": [], "humidity": [], "pressure": []}

    while True:
        if not sensor_data_queue.empty():
            data = sensor_data_queue.get()
            print(sensor_data_queue.qsize())
            sensor_type = data["type"]
            stats[sensor_type].append(data["value"])
            avg = round(sum(stats[sensor_type]) / len(stats[sensor_type]), 2)
            max_val = max(stats[sensor_type])
            min_val = min(stats[sensor_type])

            print(f"[伺服器] {sensor_type.upper()} 數據分析 - 平均值: {avg}, 最大值: {max_val}, 最小值: {min_val}")

        time.sleep(1)

NUM_DEVICES = 3
threads = []

for i in range(NUM_DEVICES):
    thread = threading.Thread(target=simulate_iot_device, args=(i,))
    thread.daemon = True
    thread.start()
    threads.append(thread)

server_thread = threading.Thread(target=process_sensor_data, daemon=True)
server_thread.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("\n[主程序] 停止 IoT 模擬與數據處理。")

結果,可以看到模擬的數據上傳,以及當前Queue 緩衝區內有多少數據尚未處理。

image.png

當然這只是簡單的範例,事實上大多數iOT數據確實是用Queue來保存數據,以免甚麼意外導致數據遺失,而數據通常會放在Data Warehouse,而不是簡簡單單統計完數據就丟。

結論

Queue算是淺顯易懂的數據結構,但運用非常廣泛像是常見的緩衝區,以及消息隊列(Message Queue)、 任務排隊與調度等等應用,後端工程師應該會非常常碰到。