佇列(Queue)
Author: Jed. Feb 06, 2025
Queue介紹
Queue
也是一個重要的線性資料結構之一,Insert
資料至Queue
稱為enqueue
,從Queue
讀取或取出稱為dequeue
,特色是先進先出(FIFO)
。
例子-排隊
有A
、B
、C
、D
在排隊買飲料,我們可以從圖中看到D
先到所以買完飲料後就走,這就是先進先出(FIFO)
特性。
graph LR
A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]
enqueue
顧名思義就是將資料加入Queue
,以下為排隊為例
graph LR
A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]
假設有個a
要來排飲料這時Queue
變化就會像這樣
graph LR
a["a"] --- A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]
後面如果又來個b
要排飲料一樣接續在a
後面,以此類推
graph LR
b["b"] --- a["a"] --- A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]
dequeue
其實也很簡單,就是買完飲料後離開,延續上一個例子
graph LR
b["b"] --- a["a"] --- A["A"] --- B["B"] --- C["C"] --- D["D"] --- E["飲料店櫃台"]
當D
買完後離開變動
graph LR
b["b"] --- a["a"] --- A["A"] --- B["B"] --- C["C"] --- E["飲料店櫃台"]
實作Queue的兩種資料結構方式
python
有提供Queue
的module
,如果是自己實作透過Array
與Linked List
則需要知道以下重點
Array實作考量
Dequeue
操作的時間複雜度為O(n)
,因為需要移動所有元素。- 需要考量
Resize
,如果陣列滿了,需要擴展陣列(動態陣列),這會導致額外的開銷。
Linked List實作考量
- 需要額外的空間存儲指針。
- 因為是順序讀取Queue,因此取得
Queue
大小時間複雜度為O(n)
Array
與Linked List
時間複雜度
操作 | 使用 Array 的時間複雜度 | 使用 Linked List 的時間複雜度 |
---|---|---|
enqueue | O(1) | O(1) |
dequeue | O(n) | O(1) |
查看queue header | O(1) | O(1) |
檢查queue是否為空 | O(1) | O(1) |
獲取queue大小 | O(1) | O(n) |
Queue的應用- 緩衝區(Buffer)
我們可以模擬iOT
裝置上船數據至伺服器,並且及時統計數據,首先模擬感測器上傳,先建立Queue
當我們的數據緩衝區,接著加上threading module
模擬iOT
設備不定時上傳數據至伺服器的Queue
。
import threading
import queue
import random
import time
sensor_data_queue = queue.Queue()
# 模擬 3 種不同 IoT 設備
SENSOR_TYPES = ["temperature", "humidity", "pressure"]
NUM_DEVICES = 3
threads = []
def simulate_iot_device(device_id):
"""模擬 IoT 設備,定期產生感測數據"""
while True:
sensor_type = random.choice(SENSOR_TYPES)
value = round(random.uniform(20, 100), 2)
timestamp = time.time()
data = {"device_id": device_id, "type": sensor_type, "value": value, "timestamp": timestamp}
sensor_data_queue.put(data)
print(f"[IoT {device_id}] 上傳數據: {data}")
time.sleep(random.uniform(1, 3)) # 模擬不定時數據上傳
for i in range(NUM_DEVICES):
thread = threading.Thread(target=simulate_iot_device, args=(i,))
thread.daemon = True
thread.start()
threads.append(thread)
接著在寫計算感測器上傳及時統計數據
def process_sensor_data():
"""從 Queue 取出數據並計算統計資訊"""
stats = {"temperature": [], "humidity": [], "pressure": []}
while True:
if not sensor_data_queue.empty():
data = sensor_data_queue.get()
print(sensor_data_queue.qsize()) # 列出queue目前裡面數據還剩多少沒處理
sensor_type = data["type"]
stats[sensor_type].append(data["value"])
# 計算即時統計數據
avg = round(sum(stats[sensor_type]) / len(stats[sensor_type]), 2)
max_val = max(stats[sensor_type])
min_val = min(stats[sensor_type])
print(f"[伺服器] {sensor_type.upper()} 數據分析 - 平均值: {avg}, 最大值: {max_val}, 最小值: {min_val}")
time.sleep(1)
server_thread = threading.Thread(target=process_sensor_data, daemon=True)
server_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[主程序] 停止 IoT 模擬與數據處理。")
完整程式碼
import threading
import queue
import random
import time
sensor_data_queue = queue.Queue()
SENSOR_TYPES = ["temperature", "humidity", "pressure"]
def simulate_iot_device(device_id):
"""模擬 IoT 設備,定期產生感測數據"""
while True:
sensor_type = random.choice(SENSOR_TYPES)
value = round(random.uniform(20, 100), 2)
timestamp = time.time()
data = {"device_id": device_id, "type": sensor_type, "value": value, "timestamp": timestamp}
sensor_data_queue.put(data)
print(f"[IoT {device_id}] 上傳數據: {data}")
time.sleep(random.uniform(1, 3))
def process_sensor_data():
"""從 Queue 取出數據並計算統計資訊"""
stats = {"temperature": [], "humidity": [], "pressure": []}
while True:
if not sensor_data_queue.empty():
data = sensor_data_queue.get()
print(sensor_data_queue.qsize())
sensor_type = data["type"]
stats[sensor_type].append(data["value"])
avg = round(sum(stats[sensor_type]) / len(stats[sensor_type]), 2)
max_val = max(stats[sensor_type])
min_val = min(stats[sensor_type])
print(f"[伺服器] {sensor_type.upper()} 數據分析 - 平均值: {avg}, 最大值: {max_val}, 最小值: {min_val}")
time.sleep(1)
NUM_DEVICES = 3
threads = []
for i in range(NUM_DEVICES):
thread = threading.Thread(target=simulate_iot_device, args=(i,))
thread.daemon = True
thread.start()
threads.append(thread)
server_thread = threading.Thread(target=process_sensor_data, daemon=True)
server_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[主程序] 停止 IoT 模擬與數據處理。")
結果,可以看到模擬的數據上傳,以及當前Queue
緩衝區內有多少數據尚未處理。
當然這只是簡單的範例,事實上大多數iOT
數據確實是用Queue
來保存數據,以免甚麼意外導致數據遺失,而數據通常會放在Data Warehouse
,而不是簡簡單單統計完數據就丟。
結論
Queue
算是淺顯易懂的數據結構,但運用非常廣泛像是常見的緩衝區,以及消息隊列(Message Queue)
、 任務排隊與調度等等應用,後端工程師應該會非常常碰到。