using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace SleepSort
{
/// <summary>
/// PartialInsertionSort のアルゴリズム、単一スレッドで実行するとただのバブル ソートなわけですが。
///
/// SortNode を全部並列に実行出来るのであれば話は別。
/// N 要素を N スレッドで実行できれば、O(N) になる。
///
/// 要素数 N が現実的に並列実行可能な数になっていると思えず実は非実用的なのだけども、
/// スリープ ソートで要素数分だけスレッド立てるくらいなら、こちらの方が何ぼかマシなのではという。
/// </summary>
/// <remarks>
/// 実行してみるとわかるものの、ものすごく遅い。
/// (それでも、Sleep するよりはましか。)
///
/// スレッドの生成とか、スレッド間の排他のためのロックとかが大量に必要になるのでどうあがいてもパフォーマンス出ない。
/// </remarks>
public static class ConcurrentSort
{
/// <summary>
/// PartialInsertionSort.Run と同じことをちゃんとスレッド立ててやる。
/// </summary>
public static void Run()
{
var data = TestData.GenerateShuffled(count: 100);
foreach (var x in Sort(data))
{
Console.WriteLine(x);
}
}
/// <summary>
/// 要素数分だけスレッドを立ててソート!
/// </summary>
/// <param name="data">ソート対象のデータ列。</param>
/// <returns>ソート済みデータ列。</returns>
public static IEnumerable<int> Sort(this IEnumerable<int> data)
{
var count = data.Count();
var ct = new CancellationTokenSource();
BlockingCollection<int> queueFirst;
BlockingCollection<int> queueLast;
BeginSortNodes(count, ct, out queueFirst, out queueLast);
// 入力キューに投函。
Task.Factory.StartNew(() =>
{
foreach (var x in data.Concat(Enumerable.Repeat(int.MaxValue, int.MaxValue)))
{
queueFirst.Add(x);
if (ct.Token.IsCancellationRequested) return;
}
});
// 出力キューから結果取り出し。
foreach (var x in queueLast.GetConsumingEnumerable().SkipWhile(x => x == int.MinValue).TakeWhile(x => x != int.MaxValue))
{
yield return x;
}
ct.Cancel();
}
/// <summary>
/// 必要な数だけノード スレッドを生成。
/// </summary>
/// <param name="count">必要なノード数。</param>
/// <param name="ct">スレッド停止用のトークン。</param>
/// <param name="queueFirst">先頭キュー(ソート元を渡すキュー)。</param>
/// <param name="queueLast">末尾キュー(ソート結果を受け取るキュー)。</param>
private static void BeginSortNodes(int count, CancellationTokenSource ct, out BlockingCollection<int> queueFirst, out BlockingCollection<int> queueLast)
{
var queues = Enumerable.Range(0, count + 1).Select(_ => new BlockingCollection<int>()).ToArray();
for (int i = 0; i < count; i++)
{
var qIn = queues[i];
var qOut = queues[i + 1];
BeginSortNode(qIn, qOut, ct.Token);
}
queueFirst = queues[0];
queueLast = queues[count];
}
/// <summary>
/// ノードを実行するスレッドを1つ立てる。
/// </summary>
/// <param name="queueIn">入力キュー。</param>
/// <param name="queueOut">出力キュー。</param>
/// <param name="ct">スレッド停止用のトークン。</param>
private static void BeginSortNode(BlockingCollection<int> queueIn, BlockingCollection<int> queueOut, CancellationToken ct)
{
var t = new Thread(_ => SortNode(queueIn, queueOut, ct));
t.Start();
}
/// <summary>
/// SortNode マルチスレッド版。
/// このメソッドを複数スレッド化して使う。
/// 前ノードからの入力、次ノードへの出力は BlockingCollection を通して行う。
/// </summary>
/// <param name="queueIn">入力キュー。</param>
/// <param name="queueOut">出力キュー。</param>
/// <param name="ct">スレッド停止用のトークン。</param>
private static void SortNode(BlockingCollection<int> queueIn, BlockingCollection<int> queueOut, CancellationToken ct)
{
int prev = int.MinValue;
while (true)
{
foreach (var x in queueIn.GetConsumingEnumerable())
{
if (prev < x)
{
queueOut.Add(prev);
prev = x;
}
else
{
queueOut.Add(x);
}
if (ct.IsCancellationRequested) return;
}
}
}
}
}