.Net4.0以降の非同期処理について

フォーラム(掲示板)ルール
フォーラム(掲示板)ルールはこちら  ※コードを貼り付ける場合は [code][/code] で囲って下さい。詳しくはこちら
アバター
kazuoni
記事: 17
登録日時: 14年前
住所: 愛知
連絡を取る:

.Net4.0以降の非同期処理について

#1

投稿記事 by kazuoni » 10年前

お久しぶりです。
現在非同期処理の実装で悩んでおります。

メインスレッドからDictionaly<key, value>が次々に生成されてきます。
その個々のDictionaryすべてで、Dictionaryが持つvalueを使って、重たい処理(Calculation method)
を行います。その処理から得られる結果を別のDictionary<key, result>に蓄えていき、
終了のシグナルが出たらメインスレッドでDictionary<key, result>を使いたいと考えております。

ここで行う計算が重たい処理であるので非同期で行いたいのですが、具体的にどのように実装していけばいいのか
がわかりません。

流れとしては
1 スタートシグナルを受け取ったら、サブスレッドはDictionary<key, value>を監視
2 Dictionaryに追加があればデータをポップし、Taskで非同期で開始
3 終了したらDictionary<key, result>をプッシュ
4 ストップシグナルを受け取ったらサブスレッド中止

このような流れを考えていましたが、Taskが非同期で開始するとデータ数が1000, 10000となったら
スレッド数も比例して増大していきます。
なのであまり効率は良くありませんが、一つの計算が終わるのを待って、終了次第、データポップを
行う処理しようと考えております。

環境は.Net4.0を想定しております。
そもそもの話になりますが、これがベストな処理の方法なのでしょうか?
ご意見、ご回答いただけると助かります。

sleep

Re: .Net4.0以降の非同期処理について

#2

投稿記事 by sleep » 10年前

TaskはThreadPoolのスレッドで実行されます。
ThreadPoolへスレッド数の上限を設定すれば
生成する数を制限することができます。
ThreadPool.SetMaxThreads

YuO
記事: 947
登録日時: 14年前
住所: 東京都世田谷区

Re: .Net4.0以降の非同期処理について

#3

投稿記事 by YuO » 10年前

単純にデータ並列であるのならば,作成側はデータをBlockingCollectionに放り込んでおいて,処理側はGetConsumingEnumerableAsParallelして取り出せばよいでしょう。
ここまでは典型的なConsumer-Producer Patternだと思います。

生成されたデータに関しては,処理を行うSelectの直後にSelectなりForAllなりした中でUIスレッドへコールバックして処理をすることになると思います。


ちなみに,PLINQを利用した場合,デフォルトでは,論理CPU数 (但し,最大64まで) の並列化が行われます。
当然ながら,別にTaskを生成したり,Threadを生成したりすれば,それ以上のタスクが動作しますが……。

Example) Consumer-Producer Pattern

コード:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
	static void Main ()
	{
		// 標準のキャンセル機構を使うための準備
		var source = new CancellationTokenSource();
		var token = source.Token;

		var currentCount = 0; // 処理中の要素数
		var maxConcurrent = 0; // 処理の最大同時実行数
		
		var workQueue = new BlockingCollection<int>(); // 入力キュー
		var resultQueue = new BlockingCollection<int>(); // 結果キュー

		// ワーカータスク。
		// Producer-Consumer Patternを(BlockingCollectionを通じて)使っているため,それに準じた名前をつけている。
		var consumerTask = Task.Factory.StartNew(() =>
		{
			maxConcurrent =
				workQueue.GetConsumingEnumerable() // 追加されたら取ってくる
				.AsParallel() // 並列化
				.Select(value =>
				{
					// 実際の内部処理

					if (token.IsCancellationRequested)
					{
						// キャンセルされていたら終了する
						return -1;
					}

					var resultValue = Interlocked.Increment(ref currentCount);

					// 重い処理のかわり
					var finish = DateTime.Now + TimeSpan.FromMilliseconds(1000 + value % 1000);
					while (finish > DateTime.Now)
					{
						Thread.SpinWait(10000);
					}

					resultQueue.Add(resultValue);
					Interlocked.Decrement(ref currentCount);

					return resultValue;
				})
				.Max();
			resultQueue.CompleteAdding();

			Console.WriteLine("Consumer Task Finished.");
		});

		// データを生成する
		for (var i = 0; i < 1000; ++i)
		{
			workQueue.Add(i);
		}
		workQueue.CompleteAdding(); // 生成の終了を通知

		Console.ReadLine(); // Enterを押すとキャンセル可能
		source.Cancel();

		Task.WaitAll(consumerTask); // タスクの終了待ち

		Console.WriteLine("Processed : {0}", resultQueue.Count);
		Console.WriteLine("Maximum   : {0}", maxConcurrent);
		if (System.Diagnostics.Debugger.IsAttached)
		{
			Console.ReadLine(); // デバッグ実行時はEnterを待つことで,表示確認を可能にする
		}
	}
}
Example Result) 物理4コア,論理8コアの場合 (途中で打ち切り)

コード:


Consumer Task Finished.
Processed : 48
Maximum   : 8

アバター
kazuoni
記事: 17
登録日時: 14年前
住所: 愛知
連絡を取る:

Re: .Net4.0以降の非同期処理について

#4

投稿記事 by kazuoni » 10年前

sleepさん、YuOさんご回答ありがとうございました。

Consumer-Producerパターンなんてあったんですね。
全く知りませんでした。
非同期処理を最近勉強しだして(今更)悪戦苦闘しておりますが、このようなサンプルをいただけると
勉強する身としては大変助かります。
個人的には非同期処理をしない構造を考えたいのですが、今回のケースは明らかに無理そうだったので・・・

ありがとうございました。

閉鎖

“C言語何でも質問掲示板” へ戻る