编码中代码优化

起因

因为订阅.Net项目邮件,关注在.Net 6状态的更新,也会看一下自己感兴趣的代码.看大神是如何进行编写代码.

本篇尽量写较语言通用,而不仅是在.Net平台的代码优化.以红开头的节为通用的.以绿开头的仅为.Net(C#)独有的.

本篇是总结的文章,需要长期的进行补充.

1. 代码循环展开

循环展开,就是减少循环(迭代)的次数,增加每次循环内的计算量.

[Benchmark]
public void T1()
{
    long sum = 0;
    int count = 8 * 4096 * 1024;
    for (int i = 0; i < count; i++)
    {
        sum += i;
    }
}

[Benchmark]
public void T2()
{
    long sum = 0;
    int count = 8 * 4096 * 1024;
    for (int i = 0; i < count; i += 4)
    {
        sum += i;
        sum += i + 1;
        sum += i + 2;
        sum += i + 3;
    }
}

在T2通过减少循环次数,增加单次循环内的计算,我们看一下结果

循环展开,代码性能对比结果

上面的代码只是简单说循环展开,下面我们看一下在c#中,如何使用循环展开进行代码优化(在Span中Fill方法源码)
/// <summary>
/// Fills the contents of this span with the given value.
/// </summary>
public void Fill(T value)
{
    if (Unsafe.SizeOf<T>() == 1)
    {
        uint length = (uint)_length;
        if (length == 0)
            return;

        T tmp = value; // Avoid taking address of the "value" argument. It would regress performance of the loop below.
        Unsafe.InitBlockUnaligned(ref Unsafe.As<T, byte>(ref _pointer.Value), Unsafe.As<T, byte>(ref tmp), length);
    }
    else
    {
        // Do all math as nuint to avoid unnecessary 64->32->64 bit integer truncations
        nuint length = (uint)_length;
        if (length == 0)
            return;

        ref T r = ref _pointer.Value;

        // TODO: Create block fill for value types of power of two sizes e.g. 2,4,8,16

        nuint elementSize = (uint)Unsafe.SizeOf<T>();
        nuint i = 0;
        for (; i < (length & ~(nuint)7); i += 8)  //1.执行8的倍数 8 16
        {
            Unsafe.AddByteOffset<T>(ref r, (i + 0) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 1) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 2) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 3) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 4) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 5) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 6) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 7) * elementSize) = value;
        }
        if (i < (length & ~(nuint)3))             //2.执行8以内,为4的一部分
        {
            Unsafe.AddByteOffset<T>(ref r, (i + 0) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 1) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 2) * elementSize) = value;
            Unsafe.AddByteOffset<T>(ref r, (i + 3) * elementSize) = value;
            i += 4;
        }
        for (; i < length; i++)                   //3.执行4以内的次数
        {
            Unsafe.AddByteOffset<T>(ref r, i * elementSize) = value;
        }

        /*
            * 以length为15为例   8 4 3
            * 1. for执行1次,1次执行8位 
            * 2. 执行4位
            * 3. for循环执行3次
            */

        /*
            * 以length为20为例   16 4 0
            * 1. for执行2次,1次执行8位 
            * 2. 执行4位
            * 3. 不执行
            */

        /*
            * 以length为24为例   24  0 0
            * 1. for执行3次,1次执行8位 
            * 2. 不执行
            * 3. 不执行
            */
    }
}


2. 使用栈内存

栈的性能比堆的性能更好,而且资源是自动管理(离开所在作用域自动释放),听起来栈是完美的,但栈的空间较小,所以在需要性能和减少GC压力的时候,是可以直接在栈分配资源,进行重复使用的.在Rust语言中,资源(对象)是默认分配在栈上,只有使用Box才会分配在堆上.

[Benchmark]
public void StackTest()
{
    Span<int> arr = stackalloc int[1024];
    for (int i = 0; i < arr.Length; i++)
    {
        arr[i] = 0;
    }
}

[Benchmark]
public void HeapTest()
{
    int[] arr = new int[1024];
    for (int i = 0; i < arr.Length; i++)
    {
        arr[i] = i;
    }
}

在栈和堆分配内存性能对比

测试结果得出:使用栈比堆性能更好,无论是在执行效率和GC这一块,栈是完胜堆的.

在.Net SocketPal.Windows.cs源码中:

public static SocketError Send(SafeSocketHandle handle, IList<ArraySegment<byte>> buffers, SocketFlags socketFlags, out int bytesTransferred)
{
    const int StackThreshold = 16; // arbitrary limit to avoid too much space on stack (note: may be over-sized, that's OK - length passed separately)
    int count = buffers.Count;
    bool useStack = count <= StackThreshold; //栈空间比较小,判断是否小于或等于16,才会使用栈

    WSABuffer[]? leasedWSA = null;
    GCHandle[]? leasedGC = null;
    Span<WSABuffer> WSABuffers = stackalloc WSABuffer[0];
    Span<GCHandle> objectsToPin = stackalloc GCHandle[0];
    if (useStack)                            //使用栈
    {
        WSABuffers = stackalloc WSABuffer[StackThreshold];
        objectsToPin = stackalloc GCHandle[StackThreshold];
    }
    else                                     //数量较大时,使用池化技术
    {
        WSABuffers = leasedWSA = ArrayPool<WSABuffer>.Shared.Rent(count);
        objectsToPin = leasedGC = ArrayPool<GCHandle>.Shared.Rent(count);
    }
    objectsToPin = objectsToPin.Slice(0, count);
    objectsToPin.Clear(); // note: touched in finally
    //省略一些代码
}


3. 使用池化技术

因为在Socket源码看到使用了池化技术,池化是复用资源(如空间/对象,在有GC的语言是减少资源的分配,就是减少GC回收的次数,降低Full GC的几率)来提高执行效率,从而达到性能的提升.
常见的池化技术:
常见的池化技术,如线程池/内存池/连接池等
惯例来个简单的示例:
[Benchmark]
public void PoolTest()
{
    int[] arr = ArrayPool<int>.Shared.Rent(1024);   //借用内存空间
    for (int i = 0; i < arr.Length; i++)
    {
        arr[i] = i;
    }
    ArrayPool<int>.Shared.Return(arr);              //归还内存空间
}

和栈示例的代码进行性能对比:

使用池化技术,对比直接在堆上,性能对比

得出结果: 使用池化后,没有进行GC回收,执行时间比直接分配堆上要快很多,和使用栈性能相差不大,甚至还略好于使用栈.上面只是使用ArrayPool,没有介绍MemoryPool,用法相差不大.

4. 使用cpu指令集优化

在C/C++中要针对指令,进行引入头文件.在C#中是对指令进行抽象,当然不同的体系CPU要引入不同的命名空间.看在.Net 6中对Span的Fill的方法,进行了优化.

/// <summary>
/// 在.Net 6 Span Fill加入指令进行性能优化,再不能支持硬件加速的情况,还是继续使用循环展开的方式
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="refData"></param>
/// <param name="numElements"></param>
/// <param name="value"></param>
public static void Fill<T>(ref T refData, nuint numElements, T value)
{
    // Early checks to see if it's even possible to vectorize - JIT will turn these checks into consts.
    // -1. T cannot contain references (GC can't track references in vectors)
    // -2. Vectorization must be hardware-accelerated
    // -3. T's size must not exceed the vector's size
    // -4. T's size must be a whole power of 2

    //CannotVectorize 是针对不能使用指令进行优化的情况

    //T 是引用类型,在该类型中包含其他引用类型,GC不支持Vectors中包含其他引用,对应上边的1
    if (RuntimeHelpers.IsReferenceOrContainsReferences<T>()) { goto CannotVectorize; }

    //硬件不支持指令集优化.对应上边2
    if (!Vector.IsHardwareAccelerated) { goto CannotVectorize; }

    //T类型不能大小,不能大于32 (Vector<byte>.Count 为32) 对应上边的3
    if (Unsafe.SizeOf<T>() > Vector<byte>.Count) { goto CannotVectorize; }

    //不是2的幂
    if (!BitOperations.IsPow2(Unsafe.SizeOf<T>())) { goto CannotVectorize; }

    //使用指令优化
    if (numElements >= (uint)(Vector<byte>.Count / Unsafe.SizeOf<T>()))
    {
        // We have enough data for at least one vectorized write.

        T tmp = value; // Avoid taking address of the "value" argument. It would regress performance of the loops below.
        Vector<byte> vector;

        if (Unsafe.SizeOf<T>() == 1)
        {
            vector = new Vector<byte>(Unsafe.As<T, byte>(ref tmp));
        }
        else if (Unsafe.SizeOf<T>() == 2)
        {
            vector = (Vector<byte>)(new Vector<ushort>(Unsafe.As<T, ushort>(ref tmp)));
        }
        else if (Unsafe.SizeOf<T>() == 4)
        {
            // special-case float since it's already passed in a SIMD reg
            vector = (typeof(T) == typeof(float))
                ? (Vector<byte>)(new Vector<float>((float)(object)tmp!))
                : (Vector<byte>)(new Vector<uint>(Unsafe.As<T, uint>(ref tmp)));
        }
        else if (Unsafe.SizeOf<T>() == 8)
        {
            // special-case double since it's already passed in a SIMD reg
            vector = (typeof(T) == typeof(double))
                ? (Vector<byte>)(new Vector<double>((double)(object)tmp!))
                : (Vector<byte>)(new Vector<ulong>(Unsafe.As<T, ulong>(ref tmp)));
        }
        else if (Unsafe.SizeOf<T>() == 16)
        {
            Vector128<byte> vec128 = Unsafe.As<T, Vector128<byte>>(ref tmp);
            if (Vector<byte>.Count == 16)
            {
                vector = vec128.AsVector();
            }
            else if (Vector<byte>.Count == 32)
            {
                vector = Vector256.Create(vec128, vec128).AsVector();
            }
            else
            {
                Debug.Fail("Vector<T> isn't 128 or 256 bits in size?");
                goto CannotVectorize;
            }
        }
        else if (Unsafe.SizeOf<T>() == 32)
        {
            if (Vector<byte>.Count == 32)
            {
                vector = Unsafe.As<T, Vector256<byte>>(ref tmp).AsVector();
            }
            else
            {
                Debug.Fail("Vector<T> isn't 256 bits in size?");
                goto CannotVectorize;
            }
        }
        else
        {
            Debug.Fail("Vector<T> is greater than 256 bits in size?");
            goto CannotVectorize;
        }

        ref byte refDataAsBytes = ref Unsafe.As<T, byte>(ref refData);
        nuint totalByteLength = numElements * (nuint)Unsafe.SizeOf<T>(); // get this calculation ready ahead of time
        nuint stopLoopAtOffset = totalByteLength & (nuint)(nint)(2 * (int)-Vector<byte>.Count); // intentional sign extension carries the negative bit
        nuint offset = 0;

        // Loop, writing 2 vectors at a time.
        // Compare 'numElements' rather than 'stopLoopAtOffset' because we don't want a dependency
        // on the very recently calculated 'stopLoopAtOffset' value.

        if (numElements >= (uint)(2 * Vector<byte>.Count / Unsafe.SizeOf<T>()))
        {
            do
            {
                Unsafe.WriteUnaligned(ref Unsafe.AddByteOffset(ref refDataAsBytes, offset), vector);
                Unsafe.WriteUnaligned(ref Unsafe.AddByteOffset(ref refDataAsBytes, offset + (nuint)Vector<byte>.Count), vector);
                offset += (uint)(2 * Vector<byte>.Count);
            } while (offset < stopLoopAtOffset);
        }

        // At this point, if any data remains to be written, it's strictly less than
        // 2 * sizeof(Vector) bytes. The loop above had us write an even number of vectors.
        // If the total byte length instead involves us writing an odd number of vectors, write
        // one additional vector now. The bit check below tells us if we're in an "odd vector
        // count" situation.

        if ((totalByteLength & (nuint)Vector<byte>.Count) != 0)
        {
            Unsafe.WriteUnaligned(ref Unsafe.AddByteOffset(ref refDataAsBytes, offset), vector);
        }

        // It's possible that some small buffer remains to be populated - something that won't
        // fit an entire vector's worth of data. Instead of falling back to a loop, we'll write
        // a vector at the very end of the buffer. This may involve overwriting previously
        // populated data, which is fine since we're splatting the same value for all entries.
        // There's no need to perform a length check here because we already performed this
        // check before entering the vectorized code path.

        Unsafe.WriteUnaligned(ref Unsafe.AddByteOffset(ref refDataAsBytes, totalByteLength - (nuint)Vector<byte>.Count), vector);

        // And we're done!

        return;
    }

CannotVectorize:

    // If we reached this point, we cannot vectorize this T, or there are too few
    // elements for us to vectorize. Fall back to an unrolled loop.

    nuint i = 0;

    // Write 8 elements at a time

    if (numElements >= 8)
    {
        nuint stopLoopAtOffset = numElements & ~(nuint)7;
        do
        {
            Unsafe.Add(ref refData, (nint)i + 0) = value;
            Unsafe.Add(ref refData, (nint)i + 1) = value;
            Unsafe.Add(ref refData, (nint)i + 2) = value;
            Unsafe.Add(ref refData, (nint)i + 3) = value;
            Unsafe.Add(ref refData, (nint)i + 4) = value;
            Unsafe.Add(ref refData, (nint)i + 5) = value;
            Unsafe.Add(ref refData, (nint)i + 6) = value;
            Unsafe.Add(ref refData, (nint)i + 7) = value;
        } while ((i += 8) < stopLoopAtOffset);
    }

    // Write next 4 elements if needed

    if ((numElements & 4) != 0)
    {
        Unsafe.Add(ref refData, (nint)i + 0) = value;
        Unsafe.Add(ref refData, (nint)i + 1) = value;
        Unsafe.Add(ref refData, (nint)i + 2) = value;
        Unsafe.Add(ref refData, (nint)i + 3) = value;
        i += 4;
    }

    // Write next 2 elements if needed

    if ((numElements & 2) != 0)
    {
        Unsafe.Add(ref refData, (nint)i + 0) = value;
        Unsafe.Add(ref refData, (nint)i + 1) = value;
        i += 2;
    }

    // Write final element if needed

    if ((numElements & 1) != 0)
    {
        Unsafe.Add(ref refData, (nint)i) = value;
    }
}


秋风 2021-03-13