ForkJoinTask iptal mekanizması zorunlu iş parçacığı kesmesi yerine iş birliğiyle bir bayrağa dayanmaktadır. Bu, cancel() yönteminin yalnızca görevlerin sonlandırma taleplerini gözlemlemek için açıkça sorgulaması gereken dahili bir değişken durumu ayarladığı anlamına gelir. Sonuç olarak, bu tasarım FileChannel okumaları veya soket InputStream işlemleri gibi monolitik G/Ç işlemleri için bekleyen iş parçacıklarını serbest bırakmamaktadır. Bu bloklama çağrıları iptal bayrağını kontrol etmez ve standart iş parçacığı kesme mekanizmalarıyla kesilemez.
İşçilerin bloklandığı durumlarda havuz açlığına karşı önlem almak için ForkJoinPool.managedBlock API'si geliştiricilerin bir ForkJoinPool.ManagedBlocker örneği kaydetmesine olanak tanır. Bu engelleyici, havuzu telafi eden bir iş parçacığı başlatması için işaret eder, böylece bloklama işine rağmen hedef paralellik seviyesi korunur. Engelleyicinin isReleasable yöntemi, iptal durumunu kontrol etmek veya engellenen işlemi programlı olarak kesmek için bir kanca sağlar. Bu, havuzun cevapsız G/Ç üzerinde iş parçacığı bütçesini tüketmek yerine zarif bir şekilde bozulmasını sağlar.
Bu sınırlamayla, özel bir RecursiveTask içinde Files.lines() kullanan paralel bir günlük işlemcisi oluştururken karşılaştık. Görev, bir ağdan monte edilmiş depolama aygıtından terabayt ölçeğinde günlük dosyalarını ayrıştırdı. Kullanıcılar uzun süren analiz görevlerinin iptalini talep ettiğinde, ForkJoinPool iş parçacıkları bloklama read() sistem çağrılarında dakikalarca takılı kaldı. İptal bayrağını tamamen görmezden geldiler, yeni görevlerin başlamasını engelleyerek ciddi iş parçacığı açlığına neden oldular.
Kilitli durumu çözmek için üç farklı yaklaşım değerlendirildi. İlk yaklaşım tamamen ForkJoinPool'dan vazgeçmek ve bir önbellekli ThreadPoolExecutor'a geçmekti. Bu, daha basit kesme anlamsallığı ve hemen iş parçacığı değiştirme sundu, ancak CPU yoğun ayrıştırma aşamalarımız için kritik olan iş çalma verimliliğinden ödün verdi.
İkinci yaklaşım, her G/Ç çağrısını Thread.interrupt() mantığına sararak ve SocketChannel gibi kesilebilir kanallara geçmeyi öneriyordu. Bu derhal iptali desteklese de, standart bloklama akışlarına ve üçüncü taraf ayrıştırıcılara bağımlı miras kitaplık kodu ile uyumsuz ve müdahaleci oldu.
Üçüncü yaklaşım, dosya okuma döngüsünü saran özel bir ManagedBlocker uygulayarak ForkJoinPool.managedBlock kullanmaktı. Bu engelleyici, bloklayıcı protokol aracılığıyla havuzun telafi eden iş parçacıkları yaratmasını sağlarken, belli aralıklarla isCancelled() kontrol etti. Üçüncü çözümü seçtik çünkü mevcut paralel akış mimarisini korudu ve havuza bloklama işlemleri hakkında açıkça bilgi verdi. Bu, iptal yanıt verebilirliğini ve verimliliği dengelemenin yanı sıra tüm G/Ç katmanını yeniden yazmadan gerçekleştirildi.
Sonuç olarak, iptal talepleri saniyeler içinde yayılmaya başladı ve havuz, manuel yapılandırma gerektirmeden G/Ç pikleri sırasında elli iş parçacığına kadar dinamik olarak ölçeklendi. CPU doygunluğu, iş yükü boyunca yüksek kaldı ve görev sonlandırma, ağır ağ yoğunluğu dönemlerinde bile güvenilir hale geldi.
ForkJoinPool iş parçacığı bloklamayı nasıl tespit eder ve telafi iş parçacıkları için eşik nedir?**
Havuz, aktif ve park edilmiş iş parçacıkları arasındaki sayıları temsil eden 64-bit bir ctl alanı aracılığıyla işçi iş parçacığı durumlarını takip eder. İş parçacıkları görevleri yürütürken "aktif" olarak sayılır, ancak programcı ipuçları olmadan CPU yoğun çalışmaları ile bloklama G/Ç arasında ayrım yapamaz. Bir işçi, managedBlock kullanmadan senkronizasyon monitörü veya G/Ç üzerinde bloklandığında, havuz yalnızca çalınabilecek işlerin ve mevcut işçi sayısının azalmasını gözlemler. Paralellik seviyesi ulaştığında ve ilerleme sinyalleri gelmediğinde, sonunda duraksayabilir. Telafi iş parçacıkları yalnızca managedBlock çağrıldığında veya iç JVM bloklaması Unsafe.park sayaçları aracılığıyla tespit edildiğinde güvenilir bir şekilde başlatılır, ancak varsayılan eşik belirsiz ve özel bloklama kodu için güvenilmezdir.
Neden ForkJoinTask.join() görev iptal edildiğinde hemen dönmez ve nasıl Future.get() ile zaman aşımı arasında farklılık gösterir?
join() içten içe doJoin() yöntemini çağırır, bu da çağıran iş parçacığının hedef görevi tamamlanana kadar başka işleri yürütmesini veya çalmasını sağlayan bir "yardımcı" mekanizmasını uygular. Bu, iptal durumuna bakılmaksızın gerçekleşir; çünkü iptal yalnızca yeni alt görevlerin fork edilmesini engeller ve bir tamamlama bayrağı ayarlar. Yöntem, beklemeden önce iptal bayrağını kontrol etmez ve girişinde CancellationException atmaz. Buna karşılık, ForkJoinTask üzerinde Future.get() (ki bu Future'ı uygular) iptal durumunu derhal kontrol eder ve beklemeden CancellationException fırlatabilir. Bu ayrım önemlidir çünkü join() havuz içi iş birliği için tasarlanmıştır, get() ise standart Future anlamsallığı bekleyen dış istemciler içindir.
ForkJoinPool'un paralellik seviyesi ile Runtime.availableProcessors() arasındaki etkileşim nedir ve bloklama işlemleri için paralelliği mevcut işlemci sayısının üzerine ayarlamak neden verimliliği artırabilir?**
Varsayılan ortak havuz, bir çekirdek uygulama iş parçacığı veya çöp toplama için ayırmak üzere availableProcessors() - 1 ile başlatılır. Paralellik, etkin iş parçacıklarının hedef sayısını tanımlar, katı bir maksimum değildir; havuz, bloklama çalışmaları olduğunda managedBlock işaretiyle daha fazla iş parçacığı oluşturabilir, ancak yalnızca paralellik iş parçacığı gerçekten etkin olmayı hedefler. Bloklama işlemleri için, paralelliği çekirdek sayısının üzerinde ayarlamak (örneğin, 2x veya 3x çekirdek) planlayıcının CPU'ları meşgul tutmasına olanak tanır; bu, diğer iş parçacıkları G/Ç beklerken, her çekirdek için çalıştırılabilir görevlerin var olmasını sağlamak için iş parçacığı başına çekirdek kısıtlamasını modellemektedir. Ancak, bu, bloklama oranı yanlış tahmin edildiğinde aşırı bağlam geçişi aşamasını önlemek için dikkatli ayarlamalar gerektirir.