Foreach的全部学问要点

by admin on 2019年11月26日

简介

当须要为多核机器进行优化的时候,最佳先反省下您的顺序是不是有处理能够分割开来展开并行处理。(比如,有二个壮烈的数据集结,个中的因素要求二个三个实行互相独立的耗费时间测算卡塔 尔(英语:State of Qatar)。

.net framework 4 中提供了 Parallel.ForEach 和 PLINQ
来扶持大家进行并行管理,本文研讨这两侧的异样及适用的情景。

Parallel.ForEach

Parallel.ForEach 是 foreach
的八线程完毕,他们都能对 IEnumerable<T>
类型对象实行遍历,Parallel.ForEach
的特有的地方在于它选取八线程来施行循环体内的代码段。

Parallel.ForEach 最常用的花样如下:

public static ParallelLoopResult ForEach<TSource>(
    IEnumerable<TSource> source,
    Action<TSource> body)

PLINQ

PLINQ 也是生机勃勃种对数据开展并行管理的编制程序模型,它经过 LINQ 的语法来贯彻相仿Parallel.ForEach 的八线程并行管理。

场馆风华正茂:轻便数据 之 独立操作的并行管理(使用 Parallel.ForEach卡塔 尔(阿拉伯语:قطر‎

身体力行代码:

public static void IndependentAction(IEnumerable<T> source, Action<T> action)
{
    Parallel.ForEach(source, element => action(element));
}

理由:

  1. 虽说 PLINQ 也提供了叁个好像的 ForAll
    接口,但它对于简易的单身操作太重量化了。

  2. 应用 Parallel.ForEach 你仍可以够设定
    ParallelOptions.MaxDegreeOfParalelism
    参数(内定最多必要有个别个线程卡塔尔,那样当 ThreadPool
    财富缺少(甚至当可用线程数<马克斯DegreeOfParalelism卡塔 尔(阿拉伯语:قطر‎的时候, Parallel.ForEach
    还是能够够胜利运行,何况当后续有越来越多可用线程现身时,Parallel.ForEach
    也能立刻地动用那一个线程。PLINQ 只能通过WithDegreeOfParallelism
    方法来供给一定的线程数,即:供给了多少个正是多少个,不会多也不会少。

意况二:顺序数据 之 并行管理(使用 PLINQ 来维周到据顺序卡塔尔

当输出的数据体系须求保持原本的顺序时利用 PLINQ 的 AsOrdered
方法十分轻便高效。

演示代码:

public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
    var ProcessedMovie =
        Movie
        .AsParallel()
        .AsOrdered()
        .Select(frame => ConvertToGrayscale(frame));

    foreach (var grayscaleFrame in ProcessedMovie)
    {
        // Movie frames will be evaluated lazily
    }
}

理由:

  1. Parallel.ForEach
    达成起来必要绕一些弯路,首先你须求运用以下的重载在艺术:

    public static ParallelLoopResult ForEach(

     IEnumerable<TSource> source,
     Action<TSource, ParallelLoopState, Int64> body)
    

其黄金年代重载的 Action 多带有了 index
 参数,这样你在输出的时候就能够选择那些值来保险原本的行列顺序。请看下边包车型大巴例子:

public static double [] PairwiseMultiply(double[] v1, double[] v2)
{
    var length = Math.Min(v1.Length, v2.Lenth);
    double[] result = new double[length];
    Parallel.ForEach(v1, (element, loopstate, elementIndex) =>
        result[elementIndex] = element * v2[elementIndex]);
    return result;
}

你可能已经意识到那边有个醒指标主题材料:我们运用了定点长度的数组。假使传入的是
IEnumerable 那么您有4个缓慢解决方案:

(1) 调用 IEnumerable.Count()
来获取数据长度,然后用那么些值实例化叁个定位长度的数组,然后选取上例的代码。

(2) The second option would be to materialize the original collection
before using it; in the event that your input data set is prohibitively
large, neither of the first two options will be
feasible.(没看懂贴原作卡塔 尔(阿拉伯语:قطر‎

(3)
第三种办法是使用重回多少个哈希集结的章程,这种措施下平时需求最少2倍于传播数据的内部存款和储蓄器,所以拍卖大数量时请慎用。

(4) 本人达成排序算法(保险传入数据与传播数据经过排序后次序风流倜傥致卡塔尔国

  1. 相比 PLINQ 的 AsOrdered
    方法这么简约,並且该办法能管理流式的数据,进而允许传入数据是延迟落到实处的(lazy materialized卡塔尔

情景三:流数据 之 并行处理(使用 PLINQ卡塔尔

PLINQ 能输出流数据,那性格子在须臾间场子特别实惠:

1.
结出集不需借使三个整机的管理实现的数组,即:任曾几何时刻点下内存中仅维持数组中的部分新闻

  1. 您可以看到在三个单线程上遍历输出结果(就形似他们早就存在/管理完了卡塔 尔(阿拉伯语:قطر‎

示例:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
    var StockRiskPortfolio =
        Stocks
        .AsParallel()
        .AsOrdered()
        .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
        .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));

    foreach (var stockRisk in StockRiskPortfolio)
    {
        SomeStockComputation(stockRisk.Risk);
        // StockRiskPortfolio will be a stream of results
    }
}

这边运用三个单线程的 foreach 来对 PLINQ 的出口实行三番一回管理,平日景况下
foreach 无需翘首以待 PLINQ 管理完全数数据就会开头运维。

PLINQ 也同意钦点输出缓存的点子,具体可参谋 PLINQ 的 WithMergeOptions
方法,及 ParallelMergeOptions 枚举

情景四:管理多个聚众(使用 PLINQ卡塔尔

PLINQ 的 Zip
方法提供了还要遍历五个会集併开展重新组合元算的法子,何况它能够与其他查询管理操作结合,完毕特别复杂的效能。

示例:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    return
        a
        .AsParallel()
        .AsOrdered()
        .Select(element => ExpensiveComputation(element))
        .Zip(
            b
            .AsParallel()
            .AsOrdered()
            .Select(element => DifferentExpensiveComputation(element)),
            (a_element, b_element) => Combine(a_element,b_element));
}

示范中的五个数据源能够并行管理,当双方都有贰个可用成分时提要求 Zip
实行持续管理(Combine)。

Parallel.ForEach 也能贯彻雷同的 Zip 管理:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    var numElements = Math.Min(a.Count(), b.Count());
    var result = new T[numElements];
    Parallel.ForEach(a,
        (element, loopstate, index) =>
        {
            var a_element = ExpensiveComputation(element);
            var b_element = DifferentExpensiveComputation(b.ElementAt(index));
            result[index] = Combine(a_element, b_element);
        });
    return result;
}

自然使用 Parallel.ForEach
后你就得投机认可是否要保全原有种类,而且要小心数组越界访谈的问题。

场景五:线程局地变量

Parallel.ForEach 提供了叁个线程局部变量的重载,定义如下:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal,TLocal> body,
    Action<TLocal> localFinally)

接受的示范:

public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
    var results = new List<R>();
    using (SemaphoreSlim sem = new SemaphoreSlim(1))
    {
        Parallel.ForEach(source,
            () => new List<R>(),
            (element, loopstate, localStorage) =>
            {
                bool filter = filterFunction(element);
                if (filter)
                    localStorage.Add(element);
                return localStorage;
            },
            (finalStorage) =>
            {
                lock(myLock)
                {
                    results.AddRange(finalStorage)
                };
            });
    }
    return results;
}

线程局地变量有哪些优势呢?请看上边包车型大巴例子(多个网页抓取程序卡塔 尔(阿拉伯语:قطر‎:

public static void UnsafeDownloadUrls ()
{
    WebClient webclient = new WebClient();
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

平常性第少年老成版代码是那样写的,但是运营时会报错“System.NotSupportedException
-> WebClient does not support concurrent I/O
operations.”。那是因为三个线程不能够同时做客同四个 WebClient
对象。所以我们会把 WebClient 对象定义到线程中来:

public static void BAD_DownloadUrls ()
{
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            WebClient webclient = new WebClient();
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

改进之后照旧不正常,因为您的机器不是服务器,多量实例化的 WebClient
急忙达到你机器允许的伪造连接上限数。线程局地变量能够解决那个主题素材:

public static void downloadUrlsSafe()
{
    Parallel.ForEach(urls,
        () => new WebClient(),
        (url, loopstate, index, webclient) =>
        {
            webclient.DownloadFile(url, filenames[index]+".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            return webclient;
        },
            (webclient) => { });
}

与此相类似的写法保障了小编们能获得丰富的 WebClient 实例,同时那个 WebClient
实例互相隔绝仅仅归于个别关联的线程。

虽说 PLINQ 提供了 ThreadLocal<T> 对象来兑现相似的功力:

public static void downloadUrl()
{
    var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
    var res =
        urls
        .AsParallel()
        .ForAll(
            url =>
            {
                webclient.Value.DownloadFile(url, host[url] +".dat"));
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            });
}

可是请小心:ThreadLocal<T> 相对来说花费更加大!

场景五:退出操作 (使用 Parallel.ForEach卡塔 尔(英语:State of Qatar)

Parallel.ForEach 有个重载评释如下,在这之中包涵贰个 ParallelLoopState 对象:

public static ParallelLoopResult ForEach<TSource >(
    IEnumerable<TSource> source,
    Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop()
提供了退出循环的法门,这种措施要比其他二种方式越来越快。这么些方式布告循环不要再开发银行实行新的迭代,并尽量快的临盆循环。

ParallelLoopState.IsStopped 属性可用来决断其余迭代是否调用了 Stop 方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var matchFound = false;
    Parallel.ForEach(TSpace,
        (curValue, loopstate) =>
            {
                if (curValue.Equals(match) )
                {
                    matchFound = true;
                    loopstate.Stop();
                }
            });
    return matchFound;
}

ParallelLoopState.Break() 公告循环继续推行本成分前的迭代,但不实践本元素之后的迭代。最前调用
Break 的起效用,并被记录到 ParallelLoopState.LowestBreakIteration
属性中。这种处理情势常常被接收在二个平稳的检索管理中,比方你有几个排序过的数组,你想在里面查找相配成分的相当的小index,那么可以使用以下的代码:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var loopResult = Parallel.ForEach(source,
        (curValue, loopState, curIndex) =>
        {
            if (curValue.Equals(match))
            {
                loopState.Break();
            }
         });
    var matchedIndex = loopResult.LowestBreakIteration;
    return matchedIndex.HasValue ? matchedIndex : -1;
}

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图