본문 바로가기
학교/CPL

Lecture 9-10: Parallel and Concurrent

by Hongwoo 2024. 4. 6.
반응형

목차

    Learning Objectives

    • Explain what concurrency and parallel computing are
    • Explain how we can fake parallelism on a single executor
    • Explain what atomicity is and how it is important for concurrent code
    • Explain what critical sections are and different ways in which you can protect them against concurrent use
    • Explain the potential problems with concurrent execution and how to prevent them
    • Write safe concurrent programs for parallel execution
    • Explain what futures and promises are, and how async/await notation makes their use convenient
    • Explain how (software) transactional memory works

     

     

    Concurrency (동시 실행)

    Concurrency is the ability of different parts or units of a program to be executed out-of-order or in partial order, without affecting the outcome. This allows for parallel execution of the concurrent units.

    → Code blocks are independent of each other, even if they are executed in different order, the outcome has to be same.

     

     

    Parallel Computing

    Parallel computing is a type of computation in which many calculations or processes are carried out simulataneously.

     

     

    Concurrency - Thread

    Independently schedulable entity

    Allows you to run code concurrently

     

    Does this mean that threads run in parallel? --> allows to do out-of-order execution but doesn't guarantee that they are being executed at the same time

     

     

    Concurrency - Faking parallelism

    Do a little bit of task, then switch to a different one (빨리 바꾸면 in millisec, looks like in parallel)

    Rinse and repeat: looks like parallel but is not actually doing things in parallel

     

     

    Concurrency - Preemption

    Interrupt a task and switch to a different one

    How to switch?  → interrupt mechanism in hardware

     

    Cost of "Save the value I was at" (saving state)

    Cost of "Where was I?" (restoring state)

    → Expensive

     

     

    Concurrent Algorithms

    Not everything can be done in parallel (depends on computer's hardware) 그래서 making it concurrent

    Divide-and-conquer algorithms are usually easier to parallelize 

    Amdahl's law: number of processors does not increase the speedup all the time (saturation)

     

     

    Concurrency Problems

    Safety

    Concurrent code is more prone to errors

    - Does it work for all possible orderings of operations?

     

     

    Race Condition

    Outcome depends on the sequence or timing of individual instructions e.g. Queue 동시에 받기

     

    Deadlock

     

    Heisenbugs

    The act of observing a system alters its state

     

     

    Example:

    class ParallelTest1 {
      public static void main(String[] args) throws Exception {
        Counter c = new Counter();
        Thread t1 = new Thread(() -> {
          for (int i = 0; i < 300; i++) {
            c.increment();
          }
        });
        Thread t2 = new Thread(() -> {
          for (int i = 0; i < 300; i++) {
            c.increment();
          }
        });
        t1.start();
        t2.start();
    
        // Wait for both to be done
        t1.join();
        t2.join();
    
        // Print the final result
        System.out.println(c.get());
      }
    }
    
    class Counter {
      private int x;
    
      // Single operations only to ensure atomicity
      public void increment() { x++; }
      public int get() { return x; }
    }

     

    I'm starting the first thread before the second thread so there is a bit of time so it can count 30 times (no problem), but for 300 or 3000, it does not give 600 or 6000.

     

    class ParallelTest1 {
      public static void main(String[] args) throws Exception {
        Counter c = new Counter();
        Thread t1 = new Thread(() -> {
          for (int i = 0; i < 300; i++) {
            System.out.println("" + i);
            c.increment();
          }
        });
        Thread t2 = new Thread(() -> {
          for (int i = 0; i < 300; i++) {
            System.out.println("" + i);
            c.increment();
          }
        });
        t1.start();
        t2.start();
    
        // Wait for both to be done
        t1.join();
        t2.join();
    
        // Print the final result
        System.out.println(c.get());
      }
    }
    
    class Counter {
      private int x;
    
      // Single operations only to ensure atomicity
      public void increment() { x++; }
      public int get() { return x; }
    }

     

    For-loop에 print statement 추가하면 600이나 6000 출력. (Observing the system alters its state)

     

    x++ = 

    x = x+1

    1. Read x

    2. Increment value on stack

    3. Write into x 

    So 3 operations

     

    After reading x in step 1, in between step 1 and 2, the other thread can perform the operation (so we skipped one increment count). 그래서 600이나 6000 count 아님.

     

    So we need to care about atomicity of actions (so the actions happen before and after, not during).

     

     

    Concurrency Problems - Fairness

    Many implementations are unfair 

    - Accidentally or intentional

    - Unfair solutions are usually faster (less to do)

    Starvation possible (one or multiple threads never execute)

     

     

    Not Enough Chocolate

    Goal: 

    - If there is no chocolate, someone buys it

    - Only one person buys chocolate at a time

    - We don't end up with multiple chocolates

     

     

    Solution 1:

     

     

    Solution 2: Communicate using notes

     

     

    Solution 3

     

     

    What we need is a way to prevent interleaved operations

    - Critical section (only one code can enter the section, and no other code can)

    - Do more things in one step: atomic read-modify-write operations

     

     

    Concurrency Solutions

    Critical Sections

    Region that can be entered by only one thread at a time

    Region that should be entered by only one thread at a time

    Locks

     

    E.g. Critical section in Java with Synchronized

    Synchronized keyword:

    - Allows only one thread to enter that method/block

    - Guarded on a particular object

     

    class SynchronizedExample1 {
      public static void main(String[] args) throws Exception {
        Object obj = new Object();
        new Thread(() -> doStuff(obj), "T1").start();
        new Thread(() -> doStuff(obj), "T2").start();
        // Is T1 always first?
      }
    
      public static void doStuff(Object obj) {
        System.out.println("Thread " + Thread.currentThread().getName() + " before critical section");
        synchronized (obj) {
          System.out.println("Thread " + Thread.currentThread().getName() + " entered critical section");
    
          Util.doLongCalculation(10000);
    
          System.out.println("Thread " + Thread.currentThread().getName() + " leaving critical section");
        }
        System.out.println("Thread " + Thread.currentThread().getName() + " left critical section");
      }
    }

     

    이렇게 하면:

    Thread T1 before critical section

    Thread T1 entered critical section

    Thread T2 before critical section

    Thread T1 leaving critical section

    Thread T1 left critical section

    Thread T2 entered critical section

     

    마찬가지로 Counter의 increment함수도 synchroinzed로 바꾸면:

    Return 6000 because only one thread can increment at the same time

     

     

    Example: Critical Section in Java using Locks

    Locks class

    Instead of synchronized, you can also use locks

     

    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    class LocksExample1 {
      public static void main(String[] args) {
        MyObject m = new MyObject();
        new Thread(() -> m.doStuff(), "T1").start();
        new Thread(() -> m.doOtherStuff(), "T2").start();
      }
    }
    
    class MyObject {
      private Lock lock = new ReentrantLock();
    
      public void doStuff() {
        System.out.println("Thread " + Thread.currentThread().getName() + " in doStuff, before lock");
        lock.lock();
        System.out.println("Thread " + Thread.currentThread().getName() + " in doStuff, after lock");
    
        Util.doLongCalculation(10000);
    
        System.out.println("Thread " + Thread.currentThread().getName() + " in doStuff, before unlock");
        lock.unlock();
        System.out.println("Thread " + Thread.currentThread().getName() + " in doStuff, after unlock");
      }
    
      public void doOtherStuff() {
        System.out.println("Thread " + Thread.currentThread().getName() + " in doOtherStuff, before lock");
        lock.lock();
        System.out.println("Thread " + Thread.currentThread().getName() + " in doOtherStuff, after lock");
    
        Util.doLongCalculation(10000);
        
        System.out.println("Thread " + Thread.currentThread().getName() + " in doOtherStuff, before unlock");
        lock.unlock();
        System.out.println("Thread " + Thread.currentThread().getName() + " in doOtherStuff, after unlock");
      }
    }

     

    Thread T1 in doStuff, before lock

    Thread T1 in doStuff, after lock

    Thread T2 in doOtherStuff, before lock

    Thread T1 in doStuff, before unlock

    Thread T1 in doStuff, after unlock

    Thread T2 in doOtherStuff, after lock 

    ...

     

    Nothing is safe so T2 may occur first

     

     

    Issues of locking

    Performance

    Fairness:

    - Thread who gets to go next is random

    - Fair lock is possible, but often slower

    Deadlocks

     

     

    public class Deadlock {
      private static Lock lockA = new ReentrantLock();
      private static Lock lockB = new ReentrantLock();
      private static int balanceA = 1_000_000;
      private static int balanceB = 1_000_000;
    
      /**
       * Showcases race condition for parallel non-atomic read/write of balances.
       * Using locks showcases a deadlock.
       * Show how print frequency affects how long it takes until deadlock.
       * Show how order of which lock is locked first affects whether deadlock occurs or not.
       */
      public static void main(String[] args) throws Exception {
        // Thread to transfer 100,000 euro from A to B, 1 euro at a time.
        Thread aToB = new Thread(() -> {
          for (int i = 0; i < 100_000; i++) {
            transferAtoB_noLock(1);
            // transferAtoB_withLock(1);
            // if (i % 10 == 0) System.out.println("Transferred " + i + " from A to B");
          }
        });
    
        // Thread to transfer 100,000 euro from B to A, 1 euro at a time.
        Thread bToA = new Thread(() -> {
          for (int i = 0; i < 100_000; i++) {
            transferBtoA_noLock(1);
            // transferBtoA_withLock(1);
            // if (i % 10 == 0) System.out.println("Transferred " + i + " from B to A");
          }
        });
    
        // Start transferring money
        aToB.start();
        bToA.start();
    
        // Wait for both to finish
        aToB.join();
        bToA.join();
    
        // Report how much money each has.
        reportMoney();
      }
    
      public static void reportMoney() {
        System.out.println("Balance A: " + balanceA);
        System.out.println("Balance B: " + balanceB);
      }
    
      /**
       * Transfer the given amount of money from A to B, without locking.
       */
      public static void transferAtoB_noLock(int amount) {
        balanceA = balanceA - amount;
        balanceB = balanceB + amount;
      }
    
      /**
       * Transfer the given amount of money from B to A, without locking.
       */
      public static void transferBtoA_noLock(int amount) {
        balanceB = balanceB - amount;
        balanceA = balanceA + amount;
      }
    
      /**
       * Transfer the given amount of money from A to B.
       */
      public static void transferAtoB_withLock(int amount) {
        lockA.lock();
        try {
          lockB.lock();
          try {
            balanceA = balanceA - amount;
            balanceB = balanceB + amount;
          } finally {
            lockB.unlock();
          }
          // Do stuff 
        } finally {
          lockA.unlock();
        }
      }
    
    
    
      /**
       * Transfer the given amount of money from A to B.
       */
      public static void transferBtoA_withLock(int amount) {
        lockB.lock();
        try {
          lockA.lock();
          try {
            balanceB = balanceB - amount;
            balanceA = balanceA + amount;
          } finally {
            lockA.unlock();
          }
        } finally {
          lockB.unlock();
        }
      }
    
      
    }

     

    Bank

    Unlock 쓰면 100000 아님

    Lock 썼는데 Deadlock

     

    A first locks A then B, and B first locks B then A.

    A lock, B lock, try to lock B -> waits, try to lock A -> waits so they are waiting forever

     

    if we lock in the same order, no interleaving problems

     

     

     

    Atomicity

    All or nothing

    To implement "All or nothing" operations: Hardware support needed (we don't do stuff interleaved)

     

     

    Atomic Read-Modify-Write Operations

    Explicit CPU support

    Compare-and-Swap (CAS)

     

     

    Example: Java AtomicInteger

    How is AtomicInteger implemented?

    General approach:

    - Use CAS in a while loop until successful

    - Very short window for concurrent issues, so usually succeeds first try

    Lockless way to do a concurrent counter

     

     

    How to make your life easier?

    Do lockless if possible

    If lock necessary, lock for as short as possible

    Amound and frequency of concurrent access influences best solution, check what your scenario is 

    Don't reinvent the wheel, use existing solutions suited to your scenario

    Don't use side effects

    Use pure functions wherever possible

     

     

    AtomicIntever vs Locks

    AtomicInteger is an integer variable that can be concurrently used

    AtomicInteger is more specialized

    AtomicInteger is more performant for its usage

    You can create locks with an atomic integer (or boolean, long, float, etc)

    You can create (less performant) atomic integers with locks

    AtomicInteger knows "lock time" is generally short

    Locks know nothing about how long the "lock time" will be

     

    AtomicInteger: is the value that we expected it to be? If so, update. If not, try again.

     

     

    Concurrency Solutions have Problems

    Example 1:

     

     

    Example 2:

     

     

    Example 3:

    Since we never catch them, the lock will never be unlocked. 

    → List can also throw exceptions.

    To fix it: Try and catch, Try and finally block (finally block: whatever it happens, unlock the lock) → safe for concurrent use

     

    Example 4:

     

    Concurrent safe because we don't do any structural modification of the ArrayList, such as adding or removing elements. 

     

     

     

    Software Transactional Memory

    Database Transactions: Many reading

    ACID

     

     

    What is the problem?

    Concurrently mutating causes all of our issues

    Idea: Restrict writes (and reads) of variables that are used concurrently

    Rust: only 1 thread (1 owner) can have a writable view of variable (ownership), and if there is a writable view, no others can read

    STM: Force read/write of transactional variables to be in transactions

     

     

    Example:

     

    import scala.concurrent.stm._
    
    object STMExample1 {
      def main(args: Array[String]) = {
        // In one thread, transfer money from x to y (after a delay of 0.5 sec)
        new Thread(() => {
          Thread.sleep(500)
          transfer(2)
        }).start()
    
        // In another thread, read x and y (with a delay of 1 sec between them)
        new Thread(() => {
          sum()
        }).start()
    
        // Wait until both have finished
        Thread.sleep(3000)
    
        println("Money: x=" + x.single() + ", y=" + y.single())
      }
    
      val (x, y) = (Ref(10), Ref(0))
    
      def sum() = {
        println("Starting sum")
        atomic { implicit transaction => 
          println("Reading x")
          val a = x()
          Thread.sleep(1000)
          println("Reading y")
          val b = y()
          a + b
        }
      }
    
      def transfer(n: Int) = {
        println("Transferring " + n + " from x to y")
        atomic { implicit transaction =>
          x -= n
          y += n
        }
        println("Finished transferring money!")
      }
    }

     

    delay:

    Start by not doing anything

    Sum starts; Read x and wait

    Then transfer interleaved

     

    What the transactions do: Automatically detecting this conflict from happening and automatically fixing it

    → Always report the correct sum

     

     

    Transactions

    A transaction is applied in full, or (in case of conflict) not applied at all

    Transactions can be safely retried

    → Transactions must not have side-effects outside of what is retryable (side effect actions such as printing)

     

     

    Side-effects

    Transactions should only contain:

    - STM actions

    - Reversible/non-side-effecting operations

    Only once a transaction is committed can you rely on anything that happened inside the transaction block

     

    Dealing with inconsistencies

    STM detects inconsistencies and automatically retries

     

    It keeps track of everything you read up until the point where the transaction was and if it detects someone was writing to x while I wasn't looking, then rollback

     

     

    Retry Operation

    When a transaction calls retry (e.g. condition does not hold), it would be wasteful to immediately check again

    Instead, STM waits until a field the transaction read is changed

    It automatically detects which fields were read

     

     

    Composing Transactions

    With orAtomic multiple actions can be composed

    atomic {A}  orAtomic{B}

    If A retries, then attempt B

    If B also retries, wait for variables read by A or B to change

    Once a change is detected, rerun the corresponding action

     

     

    How to implement?

    Usually optimistic approach (lock-free)

    Assume it will go well, check at end if that was the correct assumption

    E.g. 

    - keep a global version

    - keep version for each variable (last time read/written)

    - when reading/writing, if version of variable is greater than version at the start of transaction → abort

     

     

    Advantages/Disadvantages

    Advantages:

    - No deadlocks

    - No race conditions

    - Simple to use (isolation → composable)

    - Easy to understand code

     

    Disadvantages:

    - More overhead when not using a lot of threads

     

     

    Future & Promises

    Abstraction for asynchronous computation

    Proxy (대리) for something that is not yet done/known

    → Result will appear in future eventually

    Usually used in combination with a ThreadPool

     

     

    class FutureExample {
      private static ExecutorService executor = Executors.newFixedThreadPool(2);
    
      public static Future<Integer> calculate(int input) {
        System.out.println("Starting calculation");
        return executor.submit(() -> {
          // Simulate a long calculation
          Util.doLongCalculation(10000);
          return input * input;
        });
      }
    
      public static void main(String[] args) throws Exception {
        Future<Integer> f = calculate(5);
        System.out.println("Submission of task is done, but we are still executing");
        System.out.println("Result: " + f.); // will block waiting for the result
      }
    }

     

    Starting calculation

    Is the task already done? false

    Submission of task is done, but we are still executing

    In theory we could do something else at this point

     

     

    Async/Await

    Language support for promises/futures → Javascript, C#, Rust

    Shared Thread Pool (or even just a single executor)

    Async functions wrap their results in promises/futures

    Awaiting a promise/future actually gets the result

     

    async function f2() {
      var a = await sleep(5000);
      return a;
    }
    
    // console.log(f2());
    f2().then(console.log);
    
    
    
    // TODO Show in browser with proper fetch support and awaiting at top level
    
    const address = fetch("https://jsonplaceholder.typicode.com/users/1")
      .then((response) => response.json())
      .then((user) => {
        return user.address;
      });
    
    address.then(console.log);
    console.log("Main thread exits");
    
    
    // const printAddress = () => {
    //   address.then((a) => {
    //     console.log(a);
    //   });
    // };
    
    // printAddress();

     

     

    반응형

    '학교 > CPL' 카테고리의 다른 글

    CPL 5: Parallel and Concurrent  (0) 2024.04.09
    Lecture 11: Type Checking  (1) 2024.04.06
    CPL 4: Mutation and State  (0) 2024.04.05
    Lecture 7-8: Mutation and State  (0) 2024.04.01
    CPL 3: Function Interpretation and Environments  (0) 2024.03.31

    댓글