import time
import random
import threading
import multiprocessing as mp
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import psutil
# -----------------------------
# Worker function
# -----------------------------
def worker(task_id, sleep_time, results, lock):
start = time.perf_counter() * 1000 # ms
time.sleep(sleep_time) # simulate work
end = time.perf_counter() * 1000
with lock:
results.append((task_id, start, end - start))
# -----------------------------
# Multithreading: many small tasks
# -----------------------------
def run_multithreading(num_threads=4, num_tasks=80, results=None, lock=None):
threads = []
for i in range(num_tasks):
t = threading.Thread(
target=worker,
args=(i % num_threads, random.uniform(0.005, 0.02), results, lock)
)
threads.append(t)
t.start()
for t in threads:
t.join()
# -----------------------------
# Multiprocessing: few long tasks
# -----------------------------
def run_multiprocessing(num_procs=4, results=None, lock=None):
procs = []
for i in range(num_procs):
p = mp.Process(
target=worker,
args=(i, random.uniform(0.2, 0.4), results, lock)
)
procs.append(p)
p.start()
for p in procs:
p.join()
# -----------------------------
# Live Plotter
# -----------------------------
def animate_execution(mode="threading", duration=2):
colors = ['#7fcfd4', '#fff29b', '#c8c0ff', '#ff8f80']
# Shared results
if mode == "threading":
results = []
lock = threading.Lock()
task_runner = threading.Thread(target=run_multithreading, args=(4, 80, results, lock))
else:
manager = mp.Manager()
results = manager.list()
lock = manager.Lock()
task_runner = mp.Process(target=run_multiprocessing, args=(4, results, lock))
task_runner.start()
# Setup figure
fig, (ax_timeline, ax_cpu) = plt.subplots(2, 1, figsize=(10, 6))
ax_timeline.set_title(f"{mode} timeline (live)")
ax_timeline.set_xlabel("time (ms)")
ax_timeline.set_ylabel("worker")
ax_cpu.set_title("CPU utilization (live)")
ax_cpu.set_xlabel("time (ms)")
ax_cpu.set_ylabel("CPU %")
cpu_timestamps, cpu_data = [], []
# Animation update function
def update(frame):
now = time.perf_counter() * 1000
ax_timeline.clear()
ax_timeline.set_title(f"{mode} timeline (live)")
ax_timeline.set_xlabel("time (ms)")
ax_timeline.set_ylabel("worker")
# Draw intervals so far
for task_id, start, dur in list(results):
ax_timeline.broken_barh(
[(start, dur)], (task_id + 0.1, 0.8),
facecolors=colors[task_id % len(colors)]
)
ax_timeline.grid(True, linestyle=":", alpha=0.5)
# CPU usage
usage = psutil.cpu_percent(percpu=True)
cpu_data.append(usage)
elapsed = (time.perf_counter() * 1000)
cpu_timestamps.append(elapsed)
ax_cpu.clear()
for core in range(len(cpu_data[0])):
core_usage = [row[core] for row in cpu_data]
ax_cpu.plot(cpu_timestamps, core_usage, label=f"core {core}")
ax_cpu.set_title("CPU utilization (live)")
ax_cpu.set_xlabel("time (ms)")
ax_cpu.set_ylabel("CPU %")
ax_cpu.legend(fontsize="x-small", ncol=2)
ani = animation.FuncAnimation(fig, update, interval=100)
plt.tight_layout()
plt.show()
task_runner.join()
# -----------------------------
# Main
# -----------------------------
if __name__ == "__main__":
print("Running live multithreading demo...")
animate_execution(mode="threading", duration=2)
print("Running live multiprocessing demo...")
animate_execution(mode="multiprocessing", duration=2)
From Blogger iPhone client