Skip to content

Java / Kotlin Core – Multithreading

Open all questions about Spring

Open all questions about Android

Content:

RxJava (Reactive Extensions for Java) has a variety of operators that allow you to work with data streams (Observable) asynchronously and reactively. These operators help you create, transform, filter, combine, and process data streams.

Here are the main categories of operators and examples of their use:

Creating Operators:
These operators are used to create streams (Observable).

just():
Creates a stream with one or more elements.

Observable.just("Hello", "RxJava")
    .subscribe(System.out::println);

fromIterable():
Converts a collection into a data stream.

List<String> items = Arrays.asList("One", "Two", "Three");
Observable.fromIterable(items)
    .subscribe(System.out::println);

create():
Creates a stream manually using emission logic.

Observable.create(emitter -> {
    emitter.onNext("Data");
    emitter.onComplete();
}).subscribe(System.out::println);

range():
Emits a sequence of numbers in the specified range.

Observable.range(1, 5)
    .subscribe(System.out::println);

Transforming Operators:
These operators transform data in the stream.

map():
Transforms each element of the stream.

Observable.just(1, 2, 3)
    .map(item -> item * 2)
    .subscribe(System.out::println);

flatMap():
Transforms each element of the source stream into a new stream and combines them into a single stream (asynchronously).

Observable.just(1, 2, 3)
    .flatMap(item -> Observable.just(item * 2))
    .subscribe(System.out::println);

concatMap():
Similar to flatMap(), but performs the transformation on elements sequentially (synchronously), in the order they are received.

Observable.just(1, 2, 3)
    .concatMap(item -> Observable.just(item * 10))
    .subscribe(System.out::println); // 10, 20, 30

switchMap():
Transforms the element into a new stream, but if a new element arrives, the previous stream stops emitting and a new stream is emitted instead. This is useful when you always want to work with the latest stream.

Observable.just(1, 2, 3)
    .switchMap(item -> Observable.just(item * 10))
    .subscribe(System.out::println); // Only the last element from the new stream

groupBy():
Splits a stream into multiple streams (groups) based on some criterion.

Observable.just(1, 2, 3, 4, 5, 6)
    .groupBy(item -> item % 2 == 0 ? "Even" : "Odd")
    .subscribe(groupedObservable -> {
        groupedObservable.subscribe(item -> 
            System.out.println(groupedObservable.getKey() + ": " + item)
        );
    }); // Odd: 1 Even: 2 Odd: 3 Even: 4 Odd: 5 Even: 6

flatMapIterable():
Converts each element to an Iterable and emits each element of that Iterable.

Observable.just(1, 2, 3)
    .flatMapIterable(item -> Arrays.asList(item * 10, item * 100))
    .subscribe(System.out::println);  // Result: 10, 100, 20, 200, 30, 300

cast():
Converts elements in streams to a specific type.

Observable<Object> observable = Observable.just(1, "Hello", 3.14);
observable.cast(String.class)
    .subscribe(System.out::println, Throwable::printStackTrace);  // Error while casting types

buffer():
Collects elements into a buffer (e.g. several at a time) and emits them as lists.

Observable.range(1, 10)
    .buffer(3)
    .subscribe(System.out::println);  // Outputs lists [1, 2, 3], [4, 5, 6], ...

scan():
Applies an accumulator function to each element and returns each intermediate result.

Observable.just(1, 2, 3)
    .scan((acc, item) -> acc + item)
    .subscribe(System.out::println);  // 1, 3, 6

reduce():
Similar to scan(), but returns only the final result of applying the accumulator function.

Observable.just(1, 2, 3)
    .reduce((accumulator, item) -> accumulator + item)
    .subscribe(System.out::println);  // Result: 6

window():
Splits a stream into separate windows (substreams) with a certain number of elements or time.

Observable.range(1, 10)
    .window(3)
    .subscribe(window -> {
        System.out.println("New Window:");
        window.subscribe(System.out::println);
    });

New Window:
1
2
3
New Window:
4
5
6
New Window:
7
8
9
New Window:
10

toList():
Converts a stream of data into a single list.

Observable.just(1, 2, 3)
    .toList()
    .subscribe(System.out::println);  // Result: [1, 2, 3]

toMap():
Converts a data stream into a Map where the keys are generated by a function.

Observable.just("apple", "banana", "cherry")
    .toMap(fruit -> fruit.charAt(0))
    .subscribe(System.out::println);  // Result: {a=apple, b=banana, c=cherry}

startWith():
Adds elements to the beginning of the stream before emitting the original elements.

Observable.just(2, 3, 4)
    .startWith(1)
    .subscribe(System.out::println);  // Result: 1, 2, 3, 4

repeat():
Repeats the stream the specified number of times.

Observable.just(1, 2, 3)
    .repeat(2)
    .subscribe(System.out::println);  // Result: 1, 2, 3, 1, 2, 3

delay():
Delays the emission of elements for a certain time.

Observable.just(1, 2, 3)
    .delay(2, TimeUnit.SECONDS)
    .subscribe(System.out::println);  // Emits elements after 2 seconds

debounce():
Emits an element only if a certain amount of time has passed since the last emission (popular for filtering out fast-arriving events, such as when working with user inputs).

Observable.create(emitter -> {
    emitter.onNext(1);
    Thread.sleep(300);
    emitter.onNext(2);
    Thread.sleep(100);
    emitter.onNext(3);
    Thread.sleep(400);
    emitter.onComplete();
})
.debounce(200, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);  // Emits only 1 and 3 (ignores 2 as it is too fast after 1)

throttleFirst():
Emits the first element from the stream for a given time interval and ignores the remaining elements during that period.

Observable.interval(100, TimeUnit.MILLISECONDS)
    .throttleFirst(1, TimeUnit.SECONDS)
    .subscribe(System.out::println);

throttleLast() (or sample()):
Emits the last element that was generated within the given time interval.

Observable.interval(100, TimeUnit.MILLISECONDS)
    .throttleLast(1, TimeUnit.SECONDS)
    .subscribe(System.out::println);

Filtering Operators:
These operators are used to filter data in a stream.

filter():
Skips only those elements that satisfy the condition.

Observable.just(1, 2, 3, 4, 5)
    .filter(item -> item % 2 == 0)
    .subscribe(System.out::println);  // 2, 4

distinct():
Only allows unique elements to pass through.

Observable.just(1, 2, 2, 3, 3, 3)
    .distinct()
    .subscribe(System.out::println);  // 1, 2, 3

take():
Emits only the specified number of elements.

Observable.just(1, 2, 3, 4)
    .take(2)
    .subscribe(System.out::println);  // 1, 2

skip():
Skips the specified number of elements and then emits the rest.

Observable.just(1, 2, 3, 4)
    .skip(2)
    .subscribe(System.out::println);  // 3, 4

debounce():
Emits an element only if a certain amount of time has passed since the last emission.

Observable.just(1, 2, 3, 4)
    .debounce(100, TimeUnit.MILLISECONDS)
    .subscribe(System.out::println);

Combining Operators:
These operators allow you to combine multiple data streams.

merge():
Combines multiple streams into one by alternating elements.

Observable.merge(
    Observable.just("A", "B"),
    Observable.just("1", "2")
).subscribe(System.out::println);  // A, 1, B, 2

zip():
Combines elements from multiple streams using a function.

Observable.zip(
    Observable.just("A", "B"),
    Observable.just("1", "2"),
    (letter, number) -> letter + number
).subscribe(System.out::println);  // A1, B2

concat():
Emits elements from one stream, then moves on to the next.

Observable.concat(
    Observable.just("A", "B"),
    Observable.just("1", "2")
).subscribe(System.out::println);  // A, B, 1, 2

combineLatest():
Emits the last element from each thread whenever one of the threads emits a new element.

Observable.combineLatest(
    Observable.just("A", "B"),
    Observable.just("1", "2", "3"),
    (letter, number) -> letter + number
).subscribe(System.out::println);  // B1, B2, B3

Error Handling Operators:
These operators help manage errors in the stream.

onErrorReturn():
Returns the element if an error occurred.

Observable.just(1, 2, 0)
    .map(item -> 10 / item)
    .onErrorReturn(e -> -1)
    .subscribe(System.out::println);  // 10, 5, -1

retry():
Retry execution of a thread on error.

Observable.just(1, 2, 0)
    .map(item -> 10 / item)
    .retry(2)
    .subscribe(System.out::println, Throwable::printStackTrace);

onErrorResumeNext():
Continues emitting elements from another thread in case of an error.

Observable.just(1, 2, 0)
    .map(item -> 10 / item)
    .onErrorResumeNext(Observable.just(-1))
    .subscribe(System.out::println);

Time-based Operators:
These operators allow you to control time in streams.

interval():
Emits elements at regular intervals.

Observable.interval(1, TimeUnit.SECONDS)
    .subscribe(System.out::println);

timer():
Emits a single element after the specified delay.

Observable.timer(2, TimeUnit.SECONDS)
    .subscribe(System.out::println);

Connectable Operators:
They allow making cold streams hot, that is, starting to emit data only after calling a special method.

publish():
Makes the stream "hot", but does not start emitting data until connect() is called.

ConnectableObservable<Long> connectable = Observable.interval(1, TimeUnit.SECONDS).publish();
connectable.connect();  // Starts data emission

CopyOnWrite collections:
all operations on changing the collection (add, set, remove) result in the creation of a new copy of the internal array. This guarantees that when the iterator passes through the collection, ConcurrentModificationException will not be thrown.
CopyOnWriteArrayList, CopyOnWriteArraySet

Scalable Maps:
improved implementations of HashMap, TreeMap with better support for multithreading and scalability.
ConcurrentMap, ConcurrentHashMap, ConcurrentNavigableMap, ConcurrentSkipListMap, ConcurrentSkipListSet

Non-Blocking Queues:
thread-safe and non-blocking implementations of Queue on linked nodes.
ConcurrentLinkedQueue, ConcurrentLinkedDeque

Blocking Queues:
BlockingQueue, ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue, BlockingDeque, LinkedBlockingDeque, TransferQueue, LinkedTransferQueue

The volatile keyword in Kotlin (and in Java) is used to declare variables whose values can be changed by different threads. It ensures that when the variable is changed, the value is immediately written to and read from memory, rather than being cached in processor registers, which can lead to synchronization and change visibility issues.
The volatile keyword can be applied to variables of type Boolean, Byte, Char, Short, Int, Long, Float, Double, and reference types.

@Volatile
private var running: Boolean = false

In this example, the running variable will be updated in memory immediately after it is changed by any thread, and threads that use its value will see the most recent value in memory.

is a keyword in Kotlin (and in Java) that is used to synchronize access to shared resources in multithreaded applications. When multiple threads try to access a shared resource at the same time, problems can arise, such as ambiguity in the state of the resource or its corruption. synchronized avoids these problems by ensuring that only one thread at a time can access the shared resource.

synchronized(lock) {
    // a block of code that accesses a shared resource
}
@Synchronized
fun getCounter(): Int {
   return counter
}

Coroutines cannot be synchronized because synchronized is a keyword in Java that is used to synchronize access to shared resources between multiple threads. Instead, to synchronize access to shared resources between coroutines in Kotlin, you should use other synchronization mechanisms, such as atomic variables, locks, or mutexes. For example, you can use a mutex from the Kotlin standard library:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex

val mutex = Mutex()

fun main() = runBlocking {
    launch {
        mutex.withLock {
            // code that needs to be executed synchronously
        }
    }
}

This example uses the Kotlin standard library's Mutex, which allows you to lock access to a shared resource inside a withLock block. This ensures that the code inside the withLock block is only executed by one coroutine at any given time.

Race condition:
a situation in which the outcome of a program's execution depends on which threads execute first or last.

var count = 0

fun main() {
    Thread {
        for (i in 1..100000) {
            count++
        }
    }.start()
    Thread {
        for (i in 1..100000) {
            count++
        }
    }.start()
    Thread.sleep(1000)
    println("Count: $count")
}

This example creates two threads, each of which increments the count variable by 1 the same number of times. After these threads complete, the main thread prints out the value of the count variable. However, since the two threads can run in parallel, the result depends on which thread finishes first, and may be unexpected.

For example, the result may be 199836 on one run of the program, and 200000 on another. This happens because the count variable is not used atomically (is not protected by synchronization mechanisms), and two threads can change its value at the same time. As a result, the variable values can be overwritten, and the final value may be smaller than expected.

To avoid race conditions in such cases, it is necessary to use synchronization and protection mechanisms such as locks and atomic variables, which allow you to guarantee the correctness of the operations and avoid unexpected results.

Deadlock:
occurs when two or more threads are blocked waiting for each other to release resources needed to continue execution.

Incorrect use of synchronized: synchronized can be used incorrectly, which can lead to incorrect execution order or blocked threads.

To avoid these problems, Kotlin can use synchronization tools such as mutex, lock, atomic variables, and others, as well as modern multithreaded programming practices such as using immutable data structures and limiting changes to shared data only within critical sections.

class Resource(private val name: String) {
    @Synchronized fun checkResource(other: Resource) {
        println("$this: checking ${other.name}")
        Thread.sleep(1000)
        other.checkResource(this)
    }
}

fun main() {
    val resource1 = Resource("Resource 1")
    val resource2 = Resource("Resource 2")
    Thread {
        resource1.checkResource(resource2)
    }.start()
    Thread {
        resource2.checkResource(resource1)
    }.start()
}

In this example, two Resource objects are created, each synchronized using the @Synchronized annotation. Then, two threads are created, each calling the checkResource() method on different resources in different orders. So, if the first thread locks resource1 and the second thread locks resource2, both threads will wait for each other and will not be able to complete execution, which will result in a Deadlock. Deadlock example in Kotlin can be avoided if the communication between threads occurs using shared locks, for example, synchronized or lock() from the Kotlin standard library. You can also use the wait() and notify() methods to manage threads and avoid deadlocks.

Livelock:
a situation in which two or more threads continue to perform actions to avoid blocking, but fail to complete their work. As a result, they are in an infinite loop, consuming more and more resources without performing any useful work

data class Person(val name: String, val isPolite: Boolean = true) {
    fun greet(other: Person) {
        while (true) {
            if (isPolite) {
                println("$name: After you, ${other.name}")
                Thread.sleep(1000)
                if (other.isPolite) {
                    break
                }
            } else {
                println("$name: No, please, after you, ${other.name}")
                Thread.sleep(1000)
                if (!other.isPolite) {
                    break
                }
            }
        }
    }
}

fun main() {
    val john = Person("John", true)
    val jane = Person("Jane", false)
    Thread {
        john.greet(jane)
    }.start()
    Thread {
        jane.greet(john)
    }.start()
}

This example creates two Person objects, each of which can be polite or impolite depending on the value of the isPolite property. It then creates two threads, each of which calls the greet() method on the other object. The greet() method uses a while loop to check whether the other object is polite and, depending on that, continue or stop.

If both objects are polite, they will alternately ask each other to leave first and will not be able to finish their conversation. If both objects are impolite, they will refuse to leave first and will also not be able to finish their conversation. Thus, both threads will continue their work in an infinite loop without doing any useful work, which is an example of Livelock.

Livelock example can be avoided, for example, by using timers and limiting the time of execution of operations so that threads can complete their work and avoid an infinite loop. Synchronization and locks can also be used to ensure correct communication between threads.

Lock:
is a synchronization mechanism that is used to prevent contention for access to shared resources between multiple threads in a multithreaded environment. When a thread uses a lock, it gains exclusive access to the resource associated with the lock, and other threads that attempt to access that resource are blocked until the first thread releases the lock.

ReentrantLock:
is an implementation of the Lock interface in Java that allows a thread to acquire and release a lock multiple times. It is called “reusable” because it allows a thread to acquire a lock again if that thread already has access to the locked resource.
ReentrantLock allows you to control the locking mechanism more finely than with synchronized blocks, as it provides some additional features, such as the ability to suspend and resume threads that are waiting to access a locked resource, the ability to set a timeout on waiting for access to a resource, and the ability to use multiple condition variables to control access to a resource.

Buffer:
buffers all items that have been sent until the consumer is ready to process them. This strategy can lead to memory exhaustion if the producer generates items too quickly or the consumer processes items too slowly.

Drop:
discards elements that cannot be processed by the consumer. This strategy does not guarantee that all elements will be processed, but it avoids memory exhaustion.

Latest:
keeps only the last element and discards all others. This strategy is suitable for scenarios where only the last element matters.

Error:
Signals an error if the thread cannot process the data.

Missing:
If a thread cannot handle data, it will simply skip it without warning.

Cold Observable:
Does not send objects until at least one subscriber has subscribed to it;
If an observable has multiple subscribers, it will broadcast the entire sequence of objects to each subscriber.

Hot Observable:
Sends out objects when they appear, regardless of whether there are subscribers;
Each new subscriber receives only new objects, not the entire sequence.

The volatile keyword and Atomic* classes in Java are used to ensure thread safety when accessing shared memory.

volatile:
ensures that the variable's value is always read from shared memory, not from the thread cache, preventing synchronization errors. Additionally, writing to a volatile variable is also written directly to shared memory, not to the thread cache.

Atomic Classes:
provide atomicity for operations on variables. That is, they guarantee that read and write operations are performed as a single, indivisible action. Atomic classes are implemented using hardware support mechanisms, so they can be more efficient in some cases than using volatile.

In general, if you only need to ensure thread safety when accessing shared memory, you can use volatile. If you need to perform an atomic operation on a variable, you should use the Atomic* classes.

One of the mechanisms for working with asynchronous operations in Kotlin is using coroutines. They are objects that represent the result of an asynchronous operation.

Deferred coroutines are created using the async function and can be used to perform long-running operations on a background thread while keeping the main application thread free.
Once a deferred coroutine completes its work, its result can be retrieved using the await() function. Unlike the join() function, which blocks the calling thread until the coroutine completes, await() does not block the calling thread, but returns a value only when it is ready.
Here is an example of using deferred coroutines in Kotlin:

fun loadDataAsync(): Deferred<List<Data>> = GlobalScope.async {
    // loading data from the network or database in a background thread
}
fun displayData() {
    GlobalScope.launch {
        // we start a coroutine to load data
        val deferredData = loadDataAsync()
        // let's do some actions in the main thread
        // we get the result of executing the deferred coroutine
        val data = deferredData.await()
        // we process data
    }
}

LAZY coroutines are a way to create lazy coroutines in Kotlin using the lazy and async functions from the kotlinx.coroutines library.
The main difference between deferred coroutines created with the async function and LAZY coroutines is the moment the coroutine object is created. Deferred coroutines are created immediately when the async function is called, while LAZY coroutines are created only when they are called for the first time.
As a rule, LAZY coroutines are used in cases where there is no need to immediately start executing the coroutine, for example, when we do not know whether its execution will be necessary at all. This allows us to avoid unnecessary resource costs and speed up the application.
Here is an example of creating a LAZY coroutine:

val lazyCoroutine: Lazy<Deferred<String>> = lazy { 
    GlobalScope.async { 
        // running coroutine in background thread 
        "Hello from coroutine!" 
    } 
}

fun main() { 
    // performing actions in the main thread 
    // getting the result of coroutine execution 
    val result = runBlocking { lazyCoroutine.value.await() } 
    // processing the result 
    println(result) 
}

// or

fun main() = runBlocking<Unit> {
    val lazyCoroutine = launch(start = CoroutineStart.LAZY) {
        println("Coroutine is executing")
    }
    // performing other actions in the main thread
    lazyCoroutine.start() // coroutine launch
    // performing other actions in the main thread
    lazyCoroutine.join() // waiting for coroutine to complete
}

In this example, we create a deferred coroutine using the launch function and pass it the parameter start = CoroutineStart.LAZY. Then we do some other work on the main thread, and only after that we call the start() method of our deferred coroutine to start its execution.
It is important to understand that if we do not call the start() method of our deferred coroutine, it will never be executed.
We can also use the CoroutineStart.LAZY parameter with the async and withContext functions. In this case, a lazy coroutine is created that returns the result of the computation. We can call await() on this coroutine to get the result of the execution.

A component provided by Kotlin Coroutines that allows you to pass values asynchronously between coroutines.
Channels are a higher-level abstraction over primitive synchronized objects such as locks and semaphores. They allow values to be passed between coroutines while maintaining correct order and safety in a multithreaded environment.

Kotlin Channels are similar to message queues and have the following basic operations: send and receive. There is also the ability to close a channel, which stops the data transfer.
Additionally, Kotlin Channels provide various buffering operations such as limiting the buffer size and setting a timeout for receiving a message.

Overall, Kotlin Channels allow you to more efficiently and securely organize asynchronous data transfer in your application.

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
    }
    repeat(5) { println(channel.receive()) }
    println("Done!")
}
// Expected output: 1 4 9 16 25 Done!

In this example, we created a Channel using Channel(), which allows passing integers. We then started a coroutine that sends five messages to the channel using the send() method. In the main thread, we receive messages from the channel using the receive() method and print them to the console.
Here we have used the runBlocking function to start the main thread. This is necessary because the launch function starts the coroutine in a new thread.

Semaphore:
a type of lock that limits the number of threads that can enter a given section of code.

Mutex:
An object for thread synchronization, attached to every object in Java.
Can have 2 states – free and busy. The state of the mutex cannot be controlled directly.
If another thread needs to access a variable protected by a mutex, that thread is blocked until the mutex is released.
It differs from a semaphore in that only the thread that owns it can release it.
In the block of code marked with the word synchronized, a mutex is acquired.
The purpose of a mutex is to protect an object from being accessed by threads other than the one that owns the mutex.
A mutex protects data from corruption due to asynchronous changes (race conditions), but if used incorrectly it can cause other problems such as deadlock or double locking.

Monitor:
a high-level mechanism for interaction and synchronization of processes that provides access to non-shared resources.
Creates a protective mechanism for implementing synchronized blocks.

Copyright: Roman Kryvolapov