我需要一个非常快速的算法来完成以下任务。我已经实现了几种算法来完成它,但是它们对于我所需的性能来说太慢了。它应该足够快,以使算法可以在现代CPU上每秒至少运行100,000次。它将在C中实现。
我正在使用跨度/范围,这是一种在一条线上具有起点和终点坐标的结构。
我有两个跨度的向量(动态数组),我需要将它们合并。一个向量是src,另一向量是dst。向量按跨度开始坐标排序,并且跨度在一个向量内不重叠。
必须将src向量中的跨度与dst向量中的跨度合并,以使所得向量仍被排序且没有重叠。 IE。如果在合并过程中检测到重叠,则将两个跨度合并为一个。 (合并两个跨度仅是更改结构中的坐标的问题。)
现在,还有一个问题,在合并过程中,必须"扩大" src向量中的跨度。这意味着一个常量将被添加到src中每个跨度的开始坐标,另一个(较大)常量被添加到结束坐标。这意味着在src跨度扩大后,它们可能会重叠。
到目前为止,我得出的结论是它不能完全就地完成,需要某种临时存储。我认为它应该在线性时间内超过src和dst元素的总和是可行的。
该算法的多次运行之间可能共享任何临时存储。
我尝试过的两种主要方法(太慢了)是:
将src的所有元素附加到dst,然后在附加每个元素之前将其扩展。然后运行就地排序。最后,使用"读"和"写"指针对结果向量进行迭代,使读指针在写指针之前运行,并合并跨度。当所有元素都已合并(读取指针到达末尾)时,dst将被截断。
创建一个临时工作向量。如上所述,通过反复从src或dst中选取下一个元素并合并到工作向量中,进行幼稚的合并。完成后,将工作向量复制到dst,以替换它。
第一种方法的问题是排序是O((m n)* log(m n))而不是O(m n),并且有一些开销。这也意味着dst向量必须增长得比实际需要大得多。
第二个主要问题是大量复制并再次分配/取消分配内存。
如果需要,可以更改用于存储/管理跨度/矢量的数据结构。
更新:忘记说数据集有多大。在两个向量中,最常见的情况是在4到30个元素之间,并且dst为空或src和dst的跨度之间存在大量重叠。
我们知道,绝对最佳的运行时是O(m n),这是因为您至少必须扫描所有数据才能合并列表。鉴于此,您的第二种方法应为您提供这种类型的行为。
您是否介绍了第二种方法来找出瓶颈?实际上,根据您正在谈论的数据量,实际上不可能在指定的时间内完成您想要的操作。一种验证方法是做一些简单的事情,例如对循环中每个向量中跨度的所有开始和结束值求和,并对时间进行计时。基本上,这里您对向量中的每个元素都进行了最少的工作。这将为您提供可望获得的最佳性能的基准。
此外,您可以避免使用stl swap方法逐个复制vectors,并且可以将temp向量预分配为一定大小,以避免合并元素时触发数组扩展。铅>
您可能会考虑在系统中使用2个向量,并且每当需要进行合并时,都将合并到未使用的向量中,然后交换(这类似于图形中使用的双缓冲)。这样,您不必在每次合并时都重新分配向量。
但是,最好先进行概要分析,然后找出瓶颈。如果与实际的合并过程相比分配量很少,那么您需要弄清楚如何更快地进行分配。
直接访问向量原始数据可能会带来一些额外的提速,从而避免了每次访问数据时的边界检查。
您的目标系统是什么?是多核吗?如果是这样,您可以考虑对该算法进行多线程处理
我将始终对跨度矢量进行排序。这使得实现算法更容易-并且可以在线性时间内完成。
好,所以我根据以下内容对跨度进行排序:
您需要创建一个函数来执行此操作。
然后,我将使用std :: set_union合并向量(在继续之前,您可以合并多个向量)。
然后对于具有相同最小值的每个连续范围的跨度,保留第一个范围并删除其余部分(它们是第一个范围的子范围)。
然后,您需要合并跨度。现在应该可以在线性时间内实现了。
好的,这就是窍门。不要尝试就地执行此操作。使用一个或多个临时向量(并提前保留足够的空间)。然后最后调用std :: vector :: swap将结果放入您选择的输入向量中。
我希望这足以使您前进。
我专门针对此算法编写了一个新的容器类,以适应需要。这也使我有机会调整程序周围的其他代码,这些代码同时提高了一点速度。
这比使用STL向量的旧实现要快得多,但在其他方面基本上是相同的。但是,虽然速度更快,但仍然还不够快...不幸的是。
分析不再显示什么是真正的瓶颈。 MSVC探查器有时有时会把错误归咎于错误的调用(假设相同的运行分配了截然不同的运行时间),并且大多数调用已合并为一个大问题。
查看生成的代码的反汇编表明,生成的代码中存在大量跳跃,我认为这可能是现在速度缓慢的主要原因。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
| class SpanBuffer {
private:
int *data;
size_t allocated_size;
size_t count;
inline void EnsureSpace()
{
if (count == allocated_size)
Reserve(count*2);
}
public:
struct Span {
int start, end;
};
public:
SpanBuffer()
: data(0)
, allocated_size(24)
, count(0)
{
data = new int[allocated_size];
}
SpanBuffer(const SpanBuffer &src)
: data(0)
, allocated_size(src.allocated_size)
, count(src.count)
{
data = new int[allocated_size];
memcpy(data, src.data, sizeof(int)*count);
}
~SpanBuffer()
{
delete [] data;
}
inline void AddIntersection(int x)
{
EnsureSpace();
data[count++] = x;
}
inline void AddSpan(int s, int e)
{
assert((count & 1) == 0);
assert(s >= 0);
assert(e >= 0);
EnsureSpace();
data[count] = s;
data[count+1] = e;
count += 2;
}
inline void Clear()
{
count = 0;
}
inline size_t GetCount() const
{
return count;
}
inline int GetIntersection(size_t i) const
{
return data[i];
}
inline const Span * GetSpanIteratorBegin() const
{
assert((count & 1) == 0);
return reinterpret_cast<const Span *>(data);
}
inline Span * GetSpanIteratorBegin()
{
assert((count & 1) == 0);
return reinterpret_cast<Span *>(data);
}
inline const Span * GetSpanIteratorEnd() const
{
assert((count & 1) == 0);
return reinterpret_cast<const Span *>(data+count);
}
inline Span * GetSpanIteratorEnd()
{
assert((count & 1) == 0);
return reinterpret_cast<Span *>(data+count);
}
inline void MergeOrAddSpan(int s, int e)
{
assert((count & 1) == 0);
assert(s >= 0);
assert(e >= 0);
if (count == 0)
{
AddSpan(s, e);
return;
}
int *lastspan = data + count-2;
if (s > lastspan[1])
{
AddSpan(s, e);
}
else
{
if (s < lastspan[0])
lastspan[0] = s;
if (e > lastspan[1])
lastspan[1] = e;
}
}
inline void Reserve(size_t minsize)
{
if (minsize <= allocated_size)
return;
int *newdata = new int[minsize];
memcpy(newdata, data, sizeof(int)*count);
delete [] data;
data = newdata;
allocated_size = minsize;
}
inline void SortIntersections()
{
assert((count & 1) == 0);
std::sort(data, data+count, std::less<int>());
assert((count & 1) == 0);
}
inline void Swap(SpanBuffer &other)
{
std::swap(data, other.data);
std::swap(allocated_size, other.allocated_size);
std::swap(count, other.count);
}
};
struct ShapeWidener {
// How much to widen in the X direction
int widen_by;
// Half of width difference of src and dst (width of the border being produced)
int xofs;
// Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
SpanBuffer buffer;
inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);
ShapeWidener(int _xofs) : xofs(_xofs) { }
};
inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
{
if (src.GetCount() == 0) return;
if (src.GetCount() + dst.GetCount() == 0) return;
assert((src.GetCount() & 1) == 0);
assert((dst.GetCount() & 1) == 0);
assert(buffer.GetCount() == 0);
dst.Swap(buffer);
const int widen_s = xofs - widen_by;
const int widen_e = xofs + widen_by;
size_t resta = src.GetCount()/2;
size_t restb = buffer.GetCount()/2;
const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();
while (resta > 0 || restb > 0)
{
if (restb == 0)
{
dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
--resta, ++spa;
}
else if (resta == 0)
{
dst.MergeOrAddSpan(spb->start, spb->end);
--restb, ++spb;
}
else if (spa->start < spb->start)
{
dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
--resta, ++spa;
}
else
{
dst.MergeOrAddSpan(spb->start, spb->end);
--restb, ++spb;
}
}
buffer.Clear();
} |
如果您最近的实现仍然不够快,您可能最终不得不考虑其他方法。
此功能的输出用于什么?
1是正确的-完全排序比合并两个排序的列表要慢。
因此,您正在查看调整2(或全新的内容)。
如果将数据结构更改为双向链表,则可以在恒定的工作空间中合并它们。
为列表节点使用固定大小的堆分配器,既可以减少每个节点的内存使用量,又可以提高节点在内存中相互靠近的机会,从而减少页面遗漏。
您也许可以在线上或在自己喜欢的算法书中找到代码,以优化链接列表合并。您将需要对此进行自定义,以便在列表合并的同时进行跨度合并。
为优化合并,首先请注意,对于来自同一侧的值的每次运行而没有来自另一侧的值的运行,您可以一次将整个运行插入dst列表,而不必依次插入每个节点。而且您可以在正常的列表操作中为每次插入节省一次写入操作,只需在结尾处保持"悬挂"即可,因为您知道稍后会对其进行修补。并且只要您不删除应用程序中的其他任何地方,该列表就可以单链接,这意味着每个节点一次写入。
对于10微秒的运行时间-取决于n和m ...
我认为严格的线性解决方案是不可能的,因为在最坏的情况下扩宽src向量跨度可能会导致它们全部重叠(取决于您要添加的常数的大小)
问题可能出在实现中,而不是算法中;我建议为您先前的解决方案分析代码,以了解时间花在哪里
原因:
对于运行在3.2GHz的真正"现代" CPU(例如Intel Core 2 Extreme QX9770),可以期望达到约59,455 MIPS。
对于100,000个向量,您将必须以594,550指令处理每个向量。那是很多指令。
ref:维基百科MIPS
此外,请注意,将常量添加到src向量跨度不会对其进行排序,因此您可以独立地标准化src向量跨度,然后将它们与dst矢量跨度合并;这样可以减少原始算法的工作量
在方法1中提到的排序可以减少为线性时间(从描述的对数线性化),因为两个输入列表已经进行了排序。只需执行合并排序的合并步骤。使用输入跨度矢量的适当表示形式(例如单链接列表),可以就地完成此操作。
http://en.wikipedia.org/wiki/Merge_sort
没有重复分配的第二种方法怎么样?换句话说,一次分配您的临时向量,而不再分配它呢?或者,如果输入向量足够小(但不是恒定大小),则只需使用alloca而不是malloc。
此外,在速度方面,您可能要确保代码使用CMOV进行排序,因为如果代码实际上是为mergesort的每个单次迭代而分支的:
1 2 3 4
| if(src1[x] < src2[x])
dst[x] = src1[x];
else
dst[x] = src2[x]; |
分支预测将在50%的时间内失败,这将对性能造成巨大影响。有条件的移动可能会做得更好,因此请确保编译器正在执行此操作,否则,请尝试诱使它这样做。