Многопоточность thread, runnable
Содержание:
- Закрытие потоков
- Создание потока данных
- Запуск задач с помощью java.util.concurrent.ExecutorService
- Настройка размера пула
- Нет необходимости писать свое
- Создание потока с интерфейсом Runnable
- Методы wait и notify
- Класс Thread
- Новые возможности пакета java.uti.concurrent
- Проблемы, которые решает многопоточность в Java
Закрытие потоков
Последнее обновление: 25.04.2018
При завершении работы с потоком его надо закрыть с помощью метода close(), который определен в интерфейсе
Closeable. Метод close имеет следующее определение:
void close() throws IOException
Этот интерфейс уже реализуется в классах InputStream и OutputStream, а через них и во всех классах потоков.
При закрытии потока освобождаются все выделенные для него ресурсы, например, файл. В случае, если поток окажется не закрыт, может происходить утечка памяти.
Есть два способа закрытия файла. Первый традиционный заключается в использовании блока . Например, считаем данные из файла:
import java.io.*; public class Program { public static void main(String[] args) { FileInputStream fin=null; try { fin = new FileInputStream("C://SomeDir//notes.txt"); int i=-1; while((i=fin.read())!=-1){ System.out.print((char)i); } } catch(IOException ex){ System.out.println(ex.getMessage()); } finally{ try{ if(fin!=null) fin.close(); } catch(IOException ex){ System.out.println(ex.getMessage()); } } } }
Поскольку при открытии или считывании файла может произойти ошибка ввода-вывода, то код считывания помещается в блок try. И чтобы быть уверенным, что
поток в любом случае закроется, даже если при работе с ним возникнет ошибка, вызов метода помещается в блок .
И, так как метод также в случае ошибки может генерировать исключение IOException, то его вызов также помещается во вложенный блок
Начиная с Java 7 можно использовать еще один способ, который автоматически вызывает метод close. Этот способ заключается в использовании конструкции
try-with-resources (try-с-ресурсами). Данная конструкция работает с объектами, которые реализуют интерфейс .
Так как все классы потоков реализуют интерфейс , который в свою очередь наследуется от , то их также можно использовать в данной
конструкции
Итак, перепишем предыдущий пример с использованием конструкции try-with-resources:
import java.io.*; public class Program { public static void main(String[] args) { try(FileInputStream fin=new FileInputStream("C://SomeDir//notes.txt")) { int i=-1; while((i=fin.read())!=-1){ System.out.print((char)i); } } catch(IOException ex){ System.out.println(ex.getMessage()); } } }
Синтаксис конструкции следующий: . Данная конструкция также не исключает
использования блоков .
После окончания работы в блоке try у ресурса (в данном случае у объекта ) автоматически вызывается метод close().
Если нам надо использовать несколько потоков, которые после выполнения надо закрыть, то мы можем указать объекты потоков через точку с запятой:
try(FileInputStream fin=new FileInputStream("C://SomeDir//Hello.txt"); FileOutputStream fos = new FileOutputStream("C://SomeDir//Hello2.txt")) { //.................. }
НазадВперед
Создание потока данных
Последнее обновление: 02.05.2018
Для создания потока данных можно применять различные методы. В качестве источника потока мы можем использовать коллекции. В частности, в JDK 8 в интерфейс
Collection, который реализуется всеми классами коллекций, были добавлены два метода для работы с потоками:
-
: возвращается поток данных из коллекции
-
: возвращается параллельный поток данных из коллекции
Так, рассмотрим пример с ArrayList:
import java.util.stream.Stream; import java.util.*; public class Program { public static void main(String[] args) { ArrayList<String> cities = new ArrayList<String>(); Collections.addAll(cities, "Париж", "Лондон", "Мадрид"); cities.stream() // получаем поток .filter(s->s.length()==6) // применяем фильтрацию по длине строки .forEach(s->System.out.println(s)); // выводим отфильтрованные строки на консоль } }
Здесь с помощью вызова получаем поток, который использует данные из списка cities. С помощью каждой промежуточной операции,
которая применяется к потоку, мы также можем получить поток с учетом модификаций. Например, мы можем изменить предыдущий пример следующим образом:
ArrayList<String> cities = new ArrayList<String>(); Collections.addAll(cities, "Париж", "Лондон", "Мадрид"); Stream<String> citiesStream = cities.stream(); // получаем поток citiesStream = citiesStream.filter(s->s.length()==6); // применяем фильтрацию по длине строки citiesStream.forEach(s->System.out.println(s)); // выводим отфильтрованные строки на консоль
Важно, что после использования терминальных операций другие терминальные или промежуточные операции к этому же потоку не могут быть применены, поток уже употреблен. Например, в следующем случае мы получим ошибку:
citiesStream.forEach(s->System.out.println(s)); // терминальная операция употребляет поток long number = citiesStream.count(); // здесь ошибка, так как поток уже употреблен System.out.println(number); citiesStream = citiesStream.filter(s->s.length()>5); // тоже нельзя, так как поток уже употреблен
Фактически жизненный цикл потока проходит следующие три стадии:
-
Создание потока
-
Применение к потоку ряда промежуточных операций
-
Применение к потоку терминальной операции и получение результата
Кроме вышерассмотренных методов мы можем использовать еще ряд способов для создания потока данных. Один из таких способов представляет метод
Arrays.stream(T[] array), который создает поток данных из массива:
Stream<String> citiesStream = Arrays.stream(new String[]{"Париж", "Лондон", "Мадрид"}) ; citiesStream.forEach(s->System.out.println(s)); // выводим все элементы массива
Для создания потоков IntStream, DoubleStream, LongStream можно использовать соответствующие перегруженные версии этого метода:
IntStream intStream = Arrays.stream(new int[]{1,2,4,5,7}); intStream.forEach(i->System.out.println(i)); LongStream longStream = Arrays.stream(new long[]{100,250,400,5843787,237}); longStream.forEach(l->System.out.println(l)); DoubleStream doubleStream = Arrays.stream(new double[] {3.4, 6.7, 9.5, 8.2345, 121}); doubleStream.forEach(d->System.out.println(d));
И еще один способ создания потока представляет статический метод of(T..values) класса Stream:
Stream<String> citiesStream =Stream.of("Париж", "Лондон", "Мадрид"); citiesStream.forEach(s->System.out.println(s)); // можно передать массив String[] cities = {"Париж", "Лондон", "Мадрид"}; Stream<String> citiesStream2 =Stream.of(cities); IntStream intStream = IntStream.of(1,2,4,5,7); intStream.forEach(i->System.out.println(i)); LongStream longStream = LongStream.of(100,250,400,5843787,237); longStream.forEach(l->System.out.println(l)); DoubleStream doubleStream = DoubleStream.of(3.4, 6.7, 9.5, 8.2345, 121); doubleStream.forEach(d->System.out.println(d));
НазадВперед
Запуск задач с помощью java.util.concurrent.ExecutorService
Облегчив с помощью интерфейса Callable создание задач для параллельного выполнения, пакет java.util.concurrent также берет на себя работу по запуску и остановке потоков. Вместо объекта Thread предлагается использовать объект типа ExecutorService, с помощью которого пользователь может просто поместить задачу в очередь на выполнение и ждать получения результата. Можно сказать, что ExecutorService – это значительно усовершенствованная реализация шаблона WorkerThread.
ExecutorService – это интерфейс, поэтому для выполнения задач используются его конкретные потомки, адаптированные под требования разрабатываемого приложения. Однако программисту нет необходимости создавать собственную реализацию ExecutorService, так как в пакете java.util.concurrent уже присутствуют различные варианты реализации ExecutorService. Доступ к ним можно получить через статические методы служебного класса Executors, метод которого newFixedThreadPool возвращает объект типа ExecutorService со встроенной поддержкой шаблона ThreadPool. Также в классе Executors есть и другие методы для создания объектов ExecutorService с различными свойствами.
Наибольший интерес в ExecutorService представляет метод submit, через который задача ставится в очередь на выполнение. На вход этот метод принимает объект типа Callable или Runnable, а возвращает некий параметризованный объект типа Future. Этот объект можно использовать для доступа к результату выполнения задачи, который будет возвращен из метода call соответствующего Callable-объекта. При этом через объект Future можно проверить, закончено ли уже выполнение задачи – с помощью метода isDone и через метод get получить доступ к результату или исключительной ситуации, если в процессе выполнения задачи произошла ошибка.
Таким образом, при запуске задач с помощью классов из пакета java.util.concurrent не требуется прибегать к низкоуровневой поточной функциональности класса Thread, достаточно создать объект типа ExecutorService с нужными свойствами и передать ему на исполнение задачу типа Callable. Впоследствии можно легко просмотреть результат выполнения этой задачи с помощью объекта Future, как показано в листинге 4.
Листинг 4. Запуск задачи с помощью классов пакета java.util.concurrent
1 public class ExecutorServiceSample { 2 public static void main(String[] args) { 3 //создать ExecutorService на базе пула из пяти потоков 4 ExecutorService es1 = Executors.newFixedThreadPool(5); 5 //поместить задачу в очередь на выполнение 6 Future<String> f1 = es1.submit(new CallableSample()); 7 while(!f1.isDone()) { 8 //подождать пока задача не выполнится 9 } 10 try { 11 //получить результат выполнения задачи 12 System.out.println("task has been completed : " + f1.get()); 13 } catch (InterruptedException ie) { 14 ie.printStackTrace(System.err); 15 } catch (ExecutionException ee) { 16 ee.printStackTrace(System.err); 17 } 18 es1.shutdown(); 19 } 20}
Стоит обратить внимание на строку 18, где происходит остановка объекта ExecutorService с помощью метода shutdown. Дело в том, что потоки в объекте ExecutorService не останавливаются сами, как обычно, поэтому их необходимо явно остановить с помощью этого метода, при этом если в ExecutorService находятся невыполненные задачи, то потоки будут остановлены только, когда завершится последняя задача
Настройка размера пула
Настраивая размер пула потоков, важно избежать двух ошибок: слишком мало потоков или слишком много потоков. К счастью, для большинства приложений спектр между слишком большим и слишком малым количеством потоков довольно широк
Если вы помните, есть два основных преимущества в организации поточной обработки сообщений в приложениях: возможность продолжения процесса во время ожидания медленных операций, таких, как I/O (ввод — вывод), и использование возможностей нескольких процессоров. В приложениях с ограничением по скорости вычислений, функционирующих на N-процессорной машине, добавление дополнительных потоков может улучшить пропускную способность, по мере того как количество потоков подходит к N, но добавление дополнительных потоков свыше N не оправдано. Действительно, слишком много потоков разрушают качество функционирования из-за дополнительных издержек переключения процессов
Оптимальный размер пула потоков зависит от количества доступных процессоров и природы задач в рабочей очереди. На N-процессорной системе для рабочей очереди, которая будет выполнять исключительно задачи с ограничением по скорости вычислений, вы достигните максимального использования CPU с пулом потоков, в котором содержится N или N+1 поток.
Для задач, которые могут ждать осуществления I/O (ввода — вывода) — например, задачи, считывающей HTTP-запрос из сокета – вам может понадобиться увеличение размера пула свыше количества доступных процессоров, потому, что не все потоки будут работать все время. Используя профилирование, вы можете оценить отношение времени ожидания (WT) ко времени обработки (ST) для типичного запроса. Если назвать это соотношение WT/ST, для N-процессорной системе вам понадобится примерно N*(1+WT/ST) потоков для полной загруженности процессоров.
Использование процессора – не единственный фактор, важный при настройке размера пула потоков. По мере возрастания пула потоков, можно столкнуться с ограничениями планировщика, доступной памяти, или других системных ресурсов, таких, как количество сокетов, дескрипторы открытого файла, или каналы связи базы данных
Нет необходимости писать свое
Даг Ли создал отличную открытую библиотеку утилит параллельности, , которая включает объекты-мьютексы, семафоры, коллекции, такие как очереди и хэш-таблицы, хорошо работающие при параллельном доступе, и несколько реализаций рабочей очереди. Класс из этого пакета — эффективная, широко использующаяся, правильная реализация пула потоков, основанного на рабочей очереди. Прежде чем пытаться писать собственное программное обеспечение, которое вполне может оказаться неправильным, вы можете рассмотреть использование некоторых утилит в . Ссылки и дополнительную информацию смотрите в разделе .
Библиотека также служит вдохновителем для JSR 166, рабочей группы Java Community Process (JCP), которая будет производить набор параллельных утилит для включения в библиотеку классов Java в пакете , и которая готовит выпуск Java Development Kit 1.5.
Создание потока с интерфейсом Runnable
Есть более сложный вариант создания потока. Для создания нового потока нужно реализовать интерфейс Runnable. Вы можете создать поток из любого объекта, реализующего интерфейс Runnable и объявить метод run().
Внутри метода run() вы размещаете код для нового потока. Этот поток завершится, когда метод вернёт управление.
Когда вы объявите новый класс с интерфейсом Runnable, вам нужно использовать конструктор:
В первом параметре указывается экземпляр класса, реализующего интерфейс. Он определяет, где начнётся выполнение потока. Во втором параметре передаётся имя потока.
После создания нового потока, его нужно запустить с помощью метода start(), который, по сути, выполняет вызов метода run().
Создадим новый поток внутри учебного проекта в виде вложенного класса и запустим его.
Внутри конструктора MyRunnable() мы создаём новый объект класса Thread
thread = new Thread(this, "Поток для примера");
В первом параметре использовался объект this, что означает желание вызвать метод run() этого объекта. Далее вызывается метод start(), в результате чего запускается выполнение потока, начиная с метода run(). В свою очередь метод запускает цикл для нашего потока. После вызова метода start(), конструктор MyRunnable() возвращает управление приложению. Когда главный поток продолжает свою работу, он входит в свой цикл. После этого оба потока выполняются параллельно.
Можно запускать несколько потоков, а не только второй поток в дополнение к первому. Это может привести к проблемам, когда два потока пытаюсь работать с одной переменной одновременно.
Методы wait и notify
Последнее обновление: 27.04.2018
Иногда при взаимодействии потоков встает вопрос о извещении одних потоков о действиях других. Например, действия одного потока зависят от результата действий другого потока,
и надо как-то известить один поток, что второй поток произвел некую работу. И для подобных ситуаций у класса Object определено ряд методов:
-
wait(): освобождает монитор и переводит вызывающий поток в состояние ожидания до тех пор, пока другой поток не вызовет метод
-
notify(): продолжает работу потока, у которого ранее был вызван метод
-
notifyAll(): возобновляет работу всех потоков, у которых ранее был вызван метод
Все эти методы вызываются только из синхронизированного контекста — синхронизированного блока или метода.
Рассмотрим, как мы можем использовать эти методы. Возьмем стандартную задачу из прошлой темы — «Производитель-Потребитель» («Producer-Consumer»):
пока производитель не произвел продукт, потребитель не может его купить. Пусть производитель должен произвести 5 товаров, соответственно потребитель
должен их все купить. Но при этом одновременно на складе может находиться не более 3 товаров.
Для решения этой задачи задействуем методы и :
public class Program { public static void main(String[] args) { Store store=new Store(); Producer producer = new Producer(store); Consumer consumer = new Consumer(store); new Thread(producer).start(); new Thread(consumer).start(); } } // Класс Магазин, хранящий произведенные товары class Store{ private int product=0; public synchronized void get() { while (product<1) { try { wait(); } catch (InterruptedException e) { } } product--; System.out.println("Покупатель купил 1 товар"); System.out.println("Товаров на складе: " + product); notify(); } public synchronized void put() { while (product>=3) { try { wait(); } catch (InterruptedException e) { } } product++; System.out.println("Производитель добавил 1 товар"); System.out.println("Товаров на складе: " + product); notify(); } } // класс Производитель class Producer implements Runnable{ Store store; Producer(Store store){ this.store=store; } public void run(){ for (int i = 1; i < 6; i++) { store.put(); } } } // Класс Потребитель class Consumer implements Runnable{ Store store; Consumer(Store store){ this.store=store; } public void run(){ for (int i = 1; i < 6; i++) { store.get(); } } }
Итак, здесь определен класс магазина, потребителя и покупателя. Производитель в методе добавляет в объект Store с помощью его метода
6 товаров. Потребитель в методе в цикле обращается к методу объекта Store для получения
этих товаров. Оба метода Store — и являются синхронизированными.
Для отслеживания наличия товаров в классе Store проверяем значение переменной . По умолчанию товара нет, поэтому переменная равна .
Метод — получение товара должен срабатывать только при наличии хотя бы одного товара. Поэтому в методе
проверяем, отсутствует ли товар:
while (product<1)
Если товар отсутсвует, вызывается метод . Этот метод освобождает монитор объекта Store и блокирует выполнение метода get, пока для этого же монитора не будет вызван
метод .
Когда в методе добавляется товар и вызывается , то метод получает монитор и выходит из
конструкции , так как товар добавлен. Затем имитируется получение покупателем товара. Для этого
выводится сообщение, и уменьшается значение product: . И в конце вызов метода дает сигнал методу продолжить работу.
В методе работает похожая логика, только теперь метод должен срабатывать, если в магазине не более трех товаров. Поэтому в цикле проверяется наличие товара, и если товар уже есть,
то освобождаем монитор с помощью и ждем вызова в методе .
И теперь программа покажет нам другие результаты:
Производитель добавил 1 товар Товаров на складе: 1 Производитель добавил 1 товар Товаров на складе: 2 Производитель добавил 1 товар Товаров на складе: 3 Покупатель купил 1 товар Товаров на складе: 2 Покупатель купил 1 товар Товаров на складе: 1 Покупатель купил 1 товар Товаров на складе: 0 Производитель добавил 1 товар Товаров на складе: 1 Производитель добавил 1 товар Товаров на складе: 2 Покупатель купил 1 товар Товаров на складе: 1 Покупатель купил 1 товар Товаров на складе: 0
Таким образом, с помощью в методе мы ожидаем, когда производитель добавит новый продукт. А после добавления
вызываем , как бы говоря, что на складе освободилось одно место, и можно еще добавлять.
А в методе с помощью мы ожидаем освобождения места на складе. После того, как место освободится, добавляем товар и
через уведомляем покупателя о том, что он может забирать товар.
НазадВперед
Класс Thread
В Java функциональность отдельного потока заключается в классе Thread. И чтобы создать новый поток, нам надо создать
объект этого класса. Но все потоки не создаются сами по себе. Когда запускается программа, начинает работать главный поток этой программы.
От этого главного потока порождаются все остальные дочерние потоки.
С помощью статического метода Thread.currentThread() мы можем получить текущий поток выполнения:
public static void main(String[] args) { Thread t = Thread.currentThread(); // получаем главный поток System.out.println(t.getName()); // main }
По умолчанию именем главного потока будет .
Для управления потоком класс Thread предоставляет еще ряд методов. Наиболее используемые из них:
-
getName(): возвращает имя потока
-
setName(String name): устанавливает имя потока
-
getPriority(): возвращает приоритет потока
-
setPriority(int proirity): устанавливает приоритет потока. Приоритет является одним из ключевых факторов для выбора
системой потока из кучи потоков для выполнения. В этот метод в качестве параметра передается числовое значение приоритета — от 1 до 10.
По умолчанию главному потоку выставляется средний приоритет — 5. -
isAlive(): возвращает true, если поток активен
-
isInterrupted(): возвращает true, если поток был прерван
-
join(): ожидает завершение потока
-
run(): определяет точку входа в поток
-
sleep(): приостанавливает поток на заданное количество миллисекунд
-
start(): запускает поток, вызывая его метод
Мы можем вывести всю информацию о потоке:
public static void main(String[] args) { Thread t = Thread.currentThread(); // получаем главный поток System.out.println(t); // main }
Консольный вывод:
Thread
Первое будет представлять имя потока (что можно получить через ), второе значение 5 предоставляет приоритет
потока (также можно получить через ), и последнее представляет имя группы потоков, к которому относится текущий — по умолчанию также main
(также можно получить через )
Недостатки при использовании потоков
Далее мы рассмотрим, как создавать и использовать потоки. Это довольно легко. Однако при создании многопоточного приложения нам следует учитывать ряд обстоятельств,
которые негативно могут сказаться на работе приложения.
На некоторых платформах запуск новых потоков может замедлить работу приложения. Что может иметь большое значение, если нам критичная производительность
приложения.
Для каждого потока создается свой собственный стек в памяти, куда помещаются все локальные переменные и ряд других данных, связанных с выполнением
потока. Соответственно, чем больше потоков создается, тем больше памяти используется. При этом надо помнить, в любой системе размеры используемой памяти ограничены.
Кроме того, во многих системах может быть ограничение на количество потоков. Но даже если такого ограничения нет, то в любом случае
имеется естественное ограничение в виде максимальной скорости процессора.
НазадВперед
Новые возможности пакета java.uti.concurrent
Платформа Java постоянно развивается, и поэтому к существующей функциональности все время добавляются новые возможности. Иногда новая функциональность берется из уже существующих сторонних библиотек, при этом речь не идет о банальном копировании, а скорее о переосмыслении и доработке уже существующих решений. Подобным способом в версию Java 5 был добавлен пакет java.util.concurrent, включающий в себя множество уже проверенных и хорошо зарекомендовавших себя приемов для параллельного выполнения задач (этот пакет — только одно из множества важных нововведений, представленных в Java 5).
В рамках этой статьи интерес представляют уже готовые к использованию реализации шаблонов WorkerThread и ThreadPool, а также еще один способ реализации задач для параллельного выполнения, кроме упоминавшихся класса Thread и интерфейса Runnable. Ещё в пакете java.util.concurrent находятся два подпакета: java.util.concurrent.locks и java.util.concurrent.atomic, с которыми тоже стоит ознакомиться, так как они значительно упрощают организацию взаимодействия между потоками и параллельного доступа к данным.
Проблемы, которые решает многопоточность в Java
-
Одновременно выполнять несколько действий.
В примере выше разные потоки (т.е. члены семьи) параллельно выполняли несколько действий: мыли посуду, ходили в магазин, складывали вещи.
Можно привести и более «программистский» пример. Представь, что у тебя есть программа с пользовательским интерфейсом. При нажатии кнопки «Продолжить» внутри программы должны произойти какие-то вычисления, а пользователь должен увидеть следующий экран интерфейса. Если эти действия осуществляются последовательно, после нажатия кнопки «Продолжить» программа просто зависнет. Пользователь будет видеть все тот же экран с кнопкой «Продолжить», пока все внутренние вычисления не будут выполнены, и программа не дойдет до части, где начнется отрисовка интерфейса.
Что ж, подождем пару минут!
А еще мы можем переделать нашу программу, или, как говорят программисты, «распараллелить». Пусть нужные вычисления выполняются в одном потоке, а отрисовка интерфейса — в другом. У большинства компьютеров хватит на это ресурсов. В таком случае программа не будет «тупить», и пользователь будет спокойно переходить между экранами интерфейса не заботясь о том, что происходит внутри. Одно другому не мешает 🙂
-
Ускорить вычисления.
Тут все намного проще. Если наш процессор имеет несколько ядер, а большинство процессоров сейчас многоядерные, список наших задач могут параллельно решать несколько ядер. Очевидно, что если нам нужно решить 1000 задач и каждая из них решается за секунду, одно ядро справится со списком за 1000 секунд, два ядра — за 500 секунд, три — за 333 с небольшим секунды и так далее.
java.lang.Thread
I’m Thread! My name is Thread-2
I’m Thread! My name is Thread-1
I’m Thread! My name is Thread-0
I’m Thread! My name is Thread-3
I’m Thread! My name is Thread-6
I’m Thread! My name is Thread-7
I’m Thread! My name is Thread-4
I’m Thread! My name is Thread-5
I’m Thread! My name is Thread-9
I’m Thread! My name is Thread-8I’m Thread! My name is Thread-0
I’m Thread! My name is Thread-4
I’m Thread! My name is Thread-3
I’m Thread! My name is Thread-2
I’m Thread! My name is Thread-1
I’m Thread! My name is Thread-5
I’m Thread! My name is Thread-6
I’m Thread! My name is Thread-8
I’m Thread! My name is Thread-9
I’m Thread! My name is Thread-7I’m Thread! My name is Thread-0
I’m Thread! My name is Thread-3
I’m Thread! My name is Thread-1
I’m Thread! My name is Thread-2
I’m Thread! My name is Thread-6
I’m Thread! My name is Thread-4
I’m Thread! My name is Thread-9
I’m Thread! My name is Thread-5
I’m Thread! My name is Thread-7
I’m Thread! My name is Thread-8