Animated comparison multithreaded school multiprocessing

import time

import random

import threading

import multiprocessing as mp

import matplotlib.pyplot as plt

import matplotlib.animation as animation

import psutil


# -----------------------------

# Worker function

# -----------------------------

def worker(task_id, sleep_time, results, lock):

  start = time.perf_counter() * 1000 # ms

  time.sleep(sleep_time)       # simulate work

  end = time.perf_counter() * 1000

  with lock:

    results.append((task_id, start, end - start))



# -----------------------------

# Multithreading: many small tasks

# -----------------------------

def run_multithreading(num_threads=4, num_tasks=80, results=None, lock=None):

  threads = []

  for i in range(num_tasks):

    t = threading.Thread(

      target=worker,

      args=(i % num_threads, random.uniform(0.005, 0.02), results, lock)

    )

    threads.append(t)

    t.start()

  for t in threads:

    t.join()



# -----------------------------

# Multiprocessing: few long tasks

# -----------------------------

def run_multiprocessing(num_procs=4, results=None, lock=None):

  procs = []

  for i in range(num_procs):

    p = mp.Process(

      target=worker,

      args=(i, random.uniform(0.2, 0.4), results, lock)

    )

    procs.append(p)

    p.start()

  for p in procs:

    p.join()



# -----------------------------

# Live Plotter

# -----------------------------

def animate_execution(mode="threading", duration=2):

  colors = ['#7fcfd4', '#fff29b', '#c8c0ff', '#ff8f80']


  # Shared results

  if mode == "threading":

    results = []

    lock = threading.Lock()

    task_runner = threading.Thread(target=run_multithreading, args=(4, 80, results, lock))

  else:

    manager = mp.Manager()

    results = manager.list()

    lock = manager.Lock()

    task_runner = mp.Process(target=run_multiprocessing, args=(4, results, lock))


  task_runner.start()


  # Setup figure

  fig, (ax_timeline, ax_cpu) = plt.subplots(2, 1, figsize=(10, 6))

  ax_timeline.set_title(f"{mode} timeline (live)")

  ax_timeline.set_xlabel("time (ms)")

  ax_timeline.set_ylabel("worker")

  ax_cpu.set_title("CPU utilization (live)")

  ax_cpu.set_xlabel("time (ms)")

  ax_cpu.set_ylabel("CPU %")


  cpu_timestamps, cpu_data = [], []


  # Animation update function

  def update(frame):

    now = time.perf_counter() * 1000

    ax_timeline.clear()

    ax_timeline.set_title(f"{mode} timeline (live)")

    ax_timeline.set_xlabel("time (ms)")

    ax_timeline.set_ylabel("worker")


    # Draw intervals so far

    for task_id, start, dur in list(results):

      ax_timeline.broken_barh(

        [(start, dur)], (task_id + 0.1, 0.8),

        facecolors=colors[task_id % len(colors)]

      )

    ax_timeline.grid(True, linestyle=":", alpha=0.5)


    # CPU usage

    usage = psutil.cpu_percent(percpu=True)

    cpu_data.append(usage)

    elapsed = (time.perf_counter() * 1000)

    cpu_timestamps.append(elapsed)


    ax_cpu.clear()

    for core in range(len(cpu_data[0])):

      core_usage = [row[core] for row in cpu_data]

      ax_cpu.plot(cpu_timestamps, core_usage, label=f"core {core}")

    ax_cpu.set_title("CPU utilization (live)")

    ax_cpu.set_xlabel("time (ms)")

    ax_cpu.set_ylabel("CPU %")

    ax_cpu.legend(fontsize="x-small", ncol=2)


  ani = animation.FuncAnimation(fig, update, interval=100)

  plt.tight_layout()

  plt.show()


  task_runner.join()



# -----------------------------

# Main

# -----------------------------

if __name__ == "__main__":

  print("Running live multithreading demo...")

  animate_execution(mode="threading", duration=2)


  print("Running live multiprocessing demo...")

  animate_execution(mode="multiprocessing", duration=2)

From Blogger iPhone client