关于C#:帮助合并向量的算法

关于C#:帮助合并向量的算法

Help with algorithm for merging vectors

我需要一个非常快速的算法来完成以下任务。我已经实现了几种算法来完成它,但是它们对于我所需的性能来说太慢了。它应该足够快,以使算法可以在现代CPU上每秒至少运行100,000次。它将在C中实现。

我正在使用跨度/范围,这是一种在一条线上具有起点和终点坐标的结构。

我有两个跨度的向量(动态数组),我需要将它们合并。一个向量是src,另一向量是dst。向量按跨度开始坐标排序,并且跨度在一个向量内不重叠。

必须将src向量中的跨度与dst向量中的跨度合并,以使所得向量仍被排序且没有重叠。 IE。如果在合并过程中检测到重叠,则将两个跨度合并为一个。 (合并两个跨度仅是更改结构中的坐标的问题。)

现在,还有一个问题,在合并过程中,必须"扩大" src向量中的跨度。这意味着一个常量将被添加到src中每个跨度的开始坐标,另一个(较大)常量被添加到结束坐标。这意味着在src跨度扩大后,它们可能会重叠。

到目前为止,我得出的结论是它不能完全就地完成,需要某种临时存储。我认为它应该在线性时间内超过src和dst元素的总和是可行的。

该算法的多次运行之间可能共享任何临时存储。

我尝试过的两种主要方法(太慢了)是:

  • 将src的所有元素附加到dst,然后在附加每个元素之前将其扩展。然后运行就地排序。最后,使用"读"和"写"指针对结果向量进行迭代,使读指针在写指针之前运行,并合并跨度。当所有元素都已合并(读取指针到达末尾)时,dst将被截断。

  • 创建一个临时工作向量。如上所述,通过反复从src或dst中选取下一个元素并合并到工作向量中,进行幼稚的合并。完成后,将工作向量复制到dst,以替换它。

  • 第一种方法的问题是排序是O((m n)* log(m n))而不是O(m n),并且有一些开销。这也意味着dst向量必须增长得比实际需要大得多。

    第二个主要问题是大量复制并再次分配/取消分配内存。

    如果需要,可以更改用于存储/管理跨度/矢量的数据结构。

    更新:忘记说数据集有多大。在两个向量中,最常见的情况是在4到30个元素之间,并且dst为空或src和dst的跨度之间存在大量重叠。


    我们知道,绝对最佳的运行时是O(m n),这是因为您至少必须扫描所有数据才能合并列表。鉴于此,您的第二种方法应为您提供这种类型的行为。

    您是否介绍了第二种方法来找出瓶颈?实际上,根据您正在谈论的数据量,实际上不可能在指定的时间内完成您想要的操作。一种验证方法是做一些简单的事情,例如对循环中每个向量中跨度的所有开始和结束值求和,并对时间进行计时。基本上,这里您对向量中的每个元素都进行了最少的工作。这将为您提供可望获得的最佳性能的基准。

    此外,您可以避免使用stl swap方法逐个复制vectors,并且可以将temp向量预分配为一定大小,以避免合并元素时触发数组扩展。铅>

    您可能会考虑在系统中使用2个向量,并且每当需要进行合并时,都将合并到未使用的向量中,然后交换(这类似于图形中使用的双缓冲)。这样,您不必在每次合并时都重新分配向量。

    但是,最好先进行概要分析,然后找出瓶颈。如果与实际的合并过程相比分配量很少,那么您需要弄清楚如何更快地进行分配。

    直接访问向量原始数据可能会带来一些额外的提速,从而避免了每次访问数据时的边界检查。


    您的目标系统是什么?是多核吗?如果是这样,您可以考虑对该算法进行多线程处理


    我将始终对跨度矢量进行排序。这使得实现算法更容易-并且可以在线性时间内完成。

    好,所以我根据以下内容对跨度进行排序:

    • 最小跨度按升序排列
    • 然后以降序跨越最大值

    您需要创建一个函数来执行此操作。

    然后,我将使用std :: set_union合并向量(在继续之前,您可以合并多个向量)。

    然后对于具有相同最小值的每个连续范围的跨度,保留第一个范围并删除其余部分(它们是第一个范围的子范围)。

    然后,您需要合并跨度。现在应该可以在线性时间内实现了。

    好的,这就是窍门。不要尝试就地执行此操作。使用一个或多个临时向量(并提前保留足够的空间)。然后最后调用std :: vector :: swap将结果放入您选择的输入向量中。

    我希望这足以使您前进。


    我专门针对此算法编写了一个新的容器类,以适应需要。这也使我有机会调整程序周围的其他代码,这些代码同时提高了一点速度。

    这比使用STL向量的旧实现要快得多,但在其他方面基本上是相同的。但是,虽然速度更快,但仍然还不够快...不幸的是。

    分析不再显示什么是真正的瓶颈。 MSVC探查器有时有时会把错误归咎于错误的调用(假设相同的运行分配了截然不同的运行时间),并且大多数调用已合并为一个大问题。

    查看生成的代码的反汇编表明,生成的代码中存在大量跳跃,我认为这可能是现在速度缓慢的主要原因。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    class SpanBuffer {
    private:
        int *data;
        size_t allocated_size;
        size_t count;

        inline void EnsureSpace()
        {
            if (count == allocated_size)
                Reserve(count*2);
        }

    public:
        struct Span {
            int start, end;
        };

    public:
        SpanBuffer()
            : data(0)
            , allocated_size(24)
            , count(0)
        {
            data = new int[allocated_size];
        }

        SpanBuffer(const SpanBuffer &src)
            : data(0)
            , allocated_size(src.allocated_size)
            , count(src.count)
        {
            data = new int[allocated_size];
            memcpy(data, src.data, sizeof(int)*count);
        }

        ~SpanBuffer()
        {
            delete [] data;
        }

        inline void AddIntersection(int x)
        {
            EnsureSpace();
            data[count++] = x;
        }

        inline void AddSpan(int s, int e)
        {
            assert((count & 1) == 0);
            assert(s >= 0);
            assert(e >= 0);
            EnsureSpace();
            data[count] = s;
            data[count+1] = e;
            count += 2;
        }

        inline void Clear()
        {
            count = 0;
        }

        inline size_t GetCount() const
        {
            return count;
        }

        inline int GetIntersection(size_t i) const
        {
            return data[i];
        }

        inline const Span * GetSpanIteratorBegin() const
        {
            assert((count & 1) == 0);
            return reinterpret_cast<const Span *>(data);
        }

        inline Span * GetSpanIteratorBegin()
        {
            assert((count & 1) == 0);
            return reinterpret_cast<Span *>(data);
        }

        inline const Span * GetSpanIteratorEnd() const
        {
            assert((count & 1) == 0);
            return reinterpret_cast<const Span *>(data+count);
        }

        inline Span * GetSpanIteratorEnd()
        {
            assert((count & 1) == 0);
            return reinterpret_cast<Span *>(data+count);
        }

        inline void MergeOrAddSpan(int s, int e)
        {
            assert((count & 1) == 0);
            assert(s >= 0);
            assert(e >= 0);

            if (count == 0)
            {
                AddSpan(s, e);
                return;
            }

            int *lastspan = data + count-2;

            if (s > lastspan[1])
            {
                AddSpan(s, e);
            }
            else
            {
                if (s < lastspan[0])
                    lastspan[0] = s;
                if (e > lastspan[1])
                    lastspan[1] = e;
            }
        }

        inline void Reserve(size_t minsize)
        {
            if (minsize <= allocated_size)
                return;

            int *newdata = new int[minsize];

            memcpy(newdata, data, sizeof(int)*count);

            delete [] data;
            data = newdata;

            allocated_size = minsize;
        }

        inline void SortIntersections()
        {
            assert((count & 1) == 0);
            std::sort(data, data+count, std::less<int>());
            assert((count & 1) == 0);
        }

        inline void Swap(SpanBuffer &other)
        {
            std::swap(data, other.data);
            std::swap(allocated_size, other.allocated_size);
            std::swap(count, other.count);
        }
    };


    struct ShapeWidener {
        // How much to widen in the X direction
        int widen_by;
        // Half of width difference of src and dst (width of the border being produced)
        int xofs;

        // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
        SpanBuffer buffer;

        inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);

        ShapeWidener(int _xofs) : xofs(_xofs) { }
    };


    inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
    {
        if (src.GetCount() == 0) return;
        if (src.GetCount() + dst.GetCount() == 0) return;

        assert((src.GetCount() & 1) == 0);
        assert((dst.GetCount() & 1) == 0);

        assert(buffer.GetCount() == 0);

        dst.Swap(buffer);

        const int widen_s = xofs - widen_by;
        const int widen_e = xofs + widen_by;

        size_t resta = src.GetCount()/2;
        size_t restb = buffer.GetCount()/2;
        const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
        const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();

        while (resta > 0 || restb > 0)
        {
            if (restb == 0)
            {
                dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
                --resta, ++spa;
            }
            else if (resta == 0)
            {
                dst.MergeOrAddSpan(spb->start, spb->end);
                --restb, ++spb;
            }
            else if (spa->start < spb->start)
            {
                dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
                --resta, ++spa;
            }
            else
            {
                dst.MergeOrAddSpan(spb->start, spb->end);
                --restb, ++spb;
            }
        }

        buffer.Clear();
    }

    如果您最近的实现仍然不够快,您可能最终不得不考虑其他方法。

    此功能的输出用于什么?


    1是正确的-完全排序比合并两个排序的列表要慢。

    因此,您正在查看调整2(或全新的内容)。

    如果将数据结构更改为双向链表,则可以在恒定的工作空间中合并它们。

    为列表节点使用固定大小的堆分配器,既可以减少每个节点的内存使用量,又可以提高节点在内存中相互靠近的机会,从而减少页面遗漏。

    您也许可以在线上或在自己喜欢的算法书中找到代码,以优化链接列表合并。您将需要对此进行自定义,以便在列表合并的同时进行跨度合并。

    为优化合并,首先请注意,对于来自同一侧的值的每次运行而没有来自另一侧的值的运行,您可以一次将整个运行插入dst列表,而不必依次插入每个节点。而且您可以在正常的列表操作中为每次插入节省一次写入操作,只需在结尾处保持"悬挂"即可,因为您知道稍后会对其进行修补。并且只要您不删除应用程序中的其他任何地方,该列表就可以单链接,这意味着每个节点一次写入。

    对于10微秒的运行时间-取决于n和m ...


    我认为严格的线性解决方案是不可能的,因为在最坏的情况下扩宽src向量跨度可能会导致它们全部重叠(取决于您要添加的常数的大小)

    问题可能出在实现中,而不是算法中;我建议为您先前的解决方案分析代码,以了解时间花在哪里

    原因:

    对于运行在3.2GHz的真正"现代" CPU(例如Intel Core 2 Extreme QX9770),可以期望达到约59,455 MIPS。

    对于100,000个向量,您将必须以594,550指令处理每个向量。那是很多指令。

    ref:维基百科MIPS

    此外,请注意,将常量添加到src向量跨度不会对其进行排序,因此您可以独立地标准化src向量跨度,然后将它们与dst矢量跨度合并;这样可以减少原始算法的工作量


    在方法1中提到的排序可以减少为线性时间(从描述的对数线性化),因为两个输入列表已经进行了排序。只需执行合并排序的合并步骤。使用输入跨度矢量的适当表示形式(例如单链接列表),可以就地完成此操作。

    http://en.wikipedia.org/wiki/Merge_sort


    没有重复分配的第二种方法怎么样?换句话说,一次分配您的临时向量,而不再分配它呢?或者,如果输入向量足够小(但不是恒定大小),则只需使用alloca而不是malloc。

    此外,在速度方面,您可能要确保代码使用CMOV进行排序,因为如果代码实际上是为mergesort的每个单次迭代而分支的:

    1
    2
    3
    4
    if(src1[x] < src2[x])
        dst[x] = src1[x];
    else
        dst[x] = src2[x];

    分支预测将在50%的时间内失败,这将对性能造成巨大影响。有条件的移动可能会做得更好,因此请确保编译器正在执行此操作,否则,请尝试诱使它这样做。


    推荐阅读

      wps如何合并章节

      wps如何合并章节,WPS教程,1.WPS表格如何实现单元格合并?WPS 表格在新版本中增加了“合并单元格”系列按钮,同时配有下拉菜单和快捷键。新增

      提高3A四核羿龙II游戏配置的性能

      提高3A四核羿龙II游戏配置的性能,,以节能环保为主题的IT产业,目前3A低端平台处理器、主板芯片组、独立开发卡性能突出,特别是在与AMD的处理

      优化PostgreSQL中的批量更新性能

      优化PostgreSQL中的批量更新性能,数据,表格,在Ubuntu 12.04上使用PG 9.1. 我们目前需要24小时才能运行大量UPDATE数据库上的语句,其形式

      诺基亚威图性能好到哪里

      诺基亚威图性能好到哪里,诺基亚,手机,诺基亚威图性能好到哪里这是一部以前列出的手机。即使当时配置不高,该品牌的手机也不依赖于该功能吸

      魅蓝note6性能参数有哪些

      魅蓝note6性能参数有哪些,摄像头,蓝牙,魅蓝note6性能参数有哪些魅力蓝色Note6最好拍照。电池寿命更长。蓝色Note6使用高通 snapdragon 625

      计算机正常运行中死机故障的解决

      计算机正常运行中死机故障的解决,,通常有三个问题,如黑屏、花屏和蓝屏。 故障1:散热 拆卸机箱、使用皮老虎、冷发等工具来清理机箱内的灰尘

      国产电脑cpu测试|国产CPU性能

      国产电脑cpu测试|国产CPU性能,,国产CPU性能天玑9000答: 天玑9000更厉害。因为天玑9000是 最新发布的cpu,也是现在的天花板。而麒麟9000是 2