65.9K
CodeProject 正在变化。 阅读更多。
Home

对字节数组进行二进制运算,并支持并行和指针

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2012年2月11日

GPL3

5分钟阅读

viewsIcon

36434

对字节数组应用基本的二进制运算(AND、OR、XOR、NOT、ShiftLeft、ShiftRight),通过结合使用并行和指针来提高速度。

引言

Binary Operations 扩展函数应用于字节数组,以提供一种简单且有望快速使用基本二进制运算符的方式。提供了 AND、OR、XOR、NOT、左移、右移等运算符。

提供的函数依赖于 System.Threading.Tasks.Parallel 库以及许多 unsafe 代码块,在这些代码块中使用指针来顺序访问字节数组的元素。

背景

最初,我使用一个简单的 for 循环,对数组中的所有元素重复相同的二进制运算,如下面的语句所示

public static byte[] Bin_And_noParallel_noPointers(this byte[] ba, byte[] bt)
    {
    int longlen = Math.Max(ba.Length, bt.Length);
    int shortlen = Math.Min(ba.Length, bt.Length);
    byte[] result = new byte[longlen];
    for (int i = 0; i < shortlen; i++)
        {
        result[i] = (byte)(ba[i] & bt[i]);
        }
    return result;
    }

然后我脑海中冒出了一个问题:“如何才能让它更快?”之后我尝试了三种解决方案(这些解决方案已在文章末尾的代码中实现):并行处理、使用指针,以及用指向整数的指针替换指向字节的指针。

使用代码

提供的类使用 C# 4.0(在 .NET 环境中)编写。一旦您可以访问其命名空间,此类将通过这六个函数扩展字节数组(byte[])的功能

public static byte[] Bin_And(this byte[] ba, byte[] bt)
public static byte[] Bin_Or(this byte[] ba, byte[] bt)
public static byte[] Bin_Xor(this byte[] ba, byte[] bt)
public static byte[] Bin_Not(this byte[] ba)
public static byte[] Bin_ShiftLeft(this byte[] ba, int bits)
public static byte[] Bin_ShiftRight(this byte[] ba, int bits)

这些函数是扩展方法,并且应按照以下示例中的说明使用,该示例执行了两个字节数组之间的二进制“AND”运算

byte[] MyFirstByteArray, MySecondByteArray;
// fill the arrays here, for example by loading them from files
// (System.IO.File.ReadAllBytes) or filling them manually or with random bytes
byte[] result = MyFirstByteArray.Bin_And(MySecondByteArray);

如果您要尝试这些函数,请记住在 Project -> Properties -> Build 中勾选“Allow unsafe code”复选框,因为指针在 unsafe 代码块中使用...

我通过用文学文本(例如著名意大利诗人但丁·阿利吉耶里创作于 1308 年至 1321 年间的《神曲》,因此不应存在版权问题)填充字节数组进行了这些操作的测试,这些文本长度超过 500,000 字节,并且运行良好。

代码解释

正如我之前提到的,在我改进简单二进制函数的过程中的旅程中,我尝试了三种不同的策略来提高速度:并行处理、指针以及用整数代替字节。

起初,我尝试使用简单的 System.Threading.Tasks.Parallel.For,执行速度有所提高,但幅度不大。

然后我尝试了指针方法:通过索引访问数组(例如,在 MyArray[175] 中)比通过一个顺序从第一个字节移动到最后一个字节的指针访问相同数据要慢。后一种方法取得了不错的结果,与并行方法相当,甚至更快。

那么……为什么不尝试将它们结合起来呢?

嗯,使用 Parallel.For 是个坏主意,因为整体速度下降了很多。

最终的解决方案有点复杂,但速度的提升(尤其是对于大型数组)是值得的:请看下面的“AND”二进制运算的示例代码

public static byte[] Bin_And(this byte[] ba, byte[] bt)
    {
    int lenbig = Math.Max(ba.Length, bt.Length);
    int lensmall = Math.Min(ba.Length, bt.Length);
    byte[] result = new byte[lenbig];
    int ipar = 0;
    object o = new object();
    System.Action paction = delegate()
        {
        int actidx;
        lock (o)
            {
            actidx = ipar++;
            }
        unsafe
            {
            fixed (byte* ptres = result, ptba = ba, ptbt = bt)
                {
                byte* pr = (byte*)ptres;
                byte* pa = (byte*)ptba;
                byte* pt = (byte*)ptbt;
                pr += actidx; pa += actidx; pt += actidx;
                while (pr < ptres + lensmall)
                    {
                    *pr = (byte)(*pt & *pa);
                    pr += paralleldegree; pa += paralleldegree; pt += paralleldegree;
                    }
                }
            }
        };

    System.Action[] actions = new Action[paralleldegree];
    for (int i = 0; i < paralleldegree; i++)
        {
        actions[i] = paction;
        }
    Parallel.Invoke(actions);
    return result;
    }

在此函数中,声明了一个委托(包含执行 AND 运算的代码),然后该委托被调用次数等于 paralleldegree 变量(这是一个类级别的变量,在静态构造函数中赋值(您可以在文章末尾的完整类代码中看到),它包含当前机器可用的处理器数量)。委托的各个实例当然是以并行方式调用的。

在委托内部,声明了指向输入和输出字节数组的指针,然后,在 while 循环内,它们在每次循环时移动到下一个元素。函数的核心是简单的行 *pr = (*pt & *pa);,其中输入值(*pt*pa)之间的 AND 二进制运算被执行,并将结果放入输出(*pr)。

现在,如果 paralleldegree 为 1(即,没有并行),这个委托就很容易理解。

但是,假设,例如,paralleldegree 为 4。这就导致问题,即四个委托实例会执行四次完全相同的代码,导致执行时间非常慢。因此,当 paralleldegree 为 4(或任何大于 1 的数字)时,想法是让每个委托从输入字节数组的不同字节开始,然后,在 while 循环内,每次循环跳到下一个 4 个字节,从而在不与其他并行运行的委托发生冲突的情况下计算结果。

在这种情况下,第一个委托将计算字节 0、4、8、12 等;第二个委托将计算字节 1、5、9、13 等,第三个 2、6、10、14……,第四个 3、7、11、15……。开头的 lock 语句正是为了让委托从顺序点开始,这要归功于 ipar 变量(函数级别),它在每次调用时递增(当 paralleldegree 为 4 时递增四次),以及 actidx 变量(委托级别),它只分配一次(对于每个委托),并且在某种程度上充当每个委托的指针的“初始偏移量”(正如在行 pr += actidx; pa += actidx; pt += actidx; 中可以看到的)。

将并行处理与指针结合使用,极大地提高了执行速度(顺便说一句:为了分析执行速度,我过去会将要测试的函数放在一个循环中,执行 100 或 1000 次,并使用 System.Diagnostics.Stopwatch 类测量时间);但是,还可以进一步改进,即用 uint 指针(uint*)替换字节指针(byte*):uintbyte 大四倍,所以每次循环会缩短四倍。嗯,速度再次提高;不是四倍,而是“只”提高了两倍。因此,在我看来,最后的改进是值得的。

完整的类代码

using System;
using System.Threading.Tasks;

namespace MySpace
    {
    public static class BinOps
        {
        private const int bitsinbyte = 8;
        private static readonly int paralleldegree;
        private static readonly int uintsize;
        private static readonly int bitsinuint;
        static BinOps()
            {
            paralleldegree = Environment.ProcessorCount;
            uintsize = sizeof(uint) / sizeof(byte); // really paranoid, uh ?
            bitsinuint = uintsize * bitsinbyte;
            }

        public static byte[] Bin_And(this byte[] ba, byte[] bt)
            {
            int lenbig = Math.Max(ba.Length, bt.Length);
            int lensmall = Math.Min(ba.Length, bt.Length);
            byte[] result = new byte[lenbig];
            int ipar = 0;
            object o = new object();
            System.Action paction = delegate()
            {
                int actidx;
                lock (o)
                    {
                    actidx = ipar++;
                    }
                unsafe
                    {
                    fixed (byte* ptres = result, ptba = ba, ptbt = bt)
                        {
                        uint* pr = (uint*)ptres;
                        uint* pa = (uint*)ptba;
                        uint* pt = (uint*)ptbt;
                        pr += actidx; pa += actidx; pt += actidx;
                        while (pr < ptres + lensmall)
                            {
                            *pr = (*pt & *pa);
                            pr += paralleldegree; pa += paralleldegree; pt += paralleldegree;
                            }
                        }
                    }
            };
            System.Action[] actions = new Action[paralleldegree];
            for (int i = 0; i < paralleldegree; i++)
                {
                actions[i] = paction;
                }
            Parallel.Invoke(actions);
            return result;
            }
            
        public static byte[] Bin_Or(this byte[] ba, byte[] bt)
            {
            int lenbig = Math.Max(ba.Length, bt.Length);
            int lensmall = Math.Min(ba.Length, bt.Length);
            byte[] result = new byte[lenbig];
            int ipar = 0;
            object o = new object();
            System.Action paction = delegate()
                {
                    int actidx;
                    lock (o)
                        {
                        actidx = ipar++;
                        }
                    unsafe
                        {
                        fixed (byte* ptres = result, ptba = ba, ptbt = bt)
                            {
                            uint* pr = (uint*)ptres;
                            uint* pa = (uint*)ptba;
                            uint* pt = (uint*)ptbt;
                            pr += actidx; pa += actidx; pt += actidx;
                            while (pr < ptres + lensmall)
                                {
                                *pr = (*pt | *pa);
                                pr += paralleldegree; pa += paralleldegree; pt += paralleldegree;
                                }
                            uint* pl = ba.Length > bt.Length ? pa : pt;
                            while (pr < ptres + lenbig)
                                {
                                *pr = *pl;
                                pr += paralleldegree; pl += paralleldegree;
                                }
                            }
                        }
                };
            System.Action[] actions = new Action[paralleldegree];
            for (int i = 0; i < paralleldegree; i++)
                {
                actions[i] = paction;
                }
            Parallel.Invoke(actions);

            return result;
            }
            
        public static byte[] Bin_Xor(this byte[] ba, byte[] bt)
            {
            int lenbig = Math.Max(ba.Length, bt.Length);
            int lensmall = Math.Min(ba.Length, bt.Length);
            byte[] result = new byte[lenbig];
            int ipar = 0;
            object o = new object();
            System.Action paction = delegate()
                {
                    int actidx;
                    lock (o)
                        {
                        actidx = ipar++;
                        }
                    unsafe
                        {
                        fixed (byte* ptres = result, ptba = ba, ptbt = bt)
                            {
                            uint* pr = ((uint*)ptres) + actidx;
                            uint* pa = ((uint*)ptba) + actidx;
                            uint* pt = ((uint*)ptbt) + actidx;
                            while (pr < ptres + lensmall)
                                {
                                *pr = (*pt ^ *pa);
                                pr += paralleldegree; pa += paralleldegree; pt += paralleldegree;
                                }
                            uint* pl = ba.Length > bt.Length ? pa : pt;
                            while (pr < ptres + lenbig)
                                {
                                *pr = *pl;
                                pr += paralleldegree; pl += paralleldegree;
                                }
                            }
                        }
                };
            System.Action[] actions = new Action[paralleldegree];
            for (int i = 0; i < paralleldegree; i++)
                {
                actions[i] = paction;
                }
            Parallel.Invoke(actions);

            return result;
            }
            
        public static byte[] Bin_Not(this byte[] ba)
            {
            int len = ba.Length;
            byte[] result = new byte[len];
            
            int ipar = 0;
            object o = new object();
            System.Action paction = delegate()
            {
                int actidx;
                lock (o)
                    {
                    actidx = ipar++;
                    }
                unsafe
                    {
                    fixed (byte* ptres = result, ptba = ba)
                        {
                        uint* pr = (uint*)ptres;
                        uint* pa = (uint*)ptba;
                        pr += actidx; pa += actidx;
                        while (pr < ptres + len)
                            {
                            *pr = ~(*pa);
                            pr += paralleldegree; pa += paralleldegree;
                            }
                        }
                    }
            };
            System.Action[] actions = new Action[paralleldegree];
            for (int i = 0; i < paralleldegree; i++)
                {
                actions[i] = paction;
                }
            Parallel.Invoke(actions);
            return result;
            }
            
        public static byte[] Bin_ShiftLeft(this byte[] ba, int bits)
            {
            int ipar = 0;
            object o = new object();

            int len = ba.Length;
            if (bits >= len * bitsinbyte) return new byte[len];
            int shiftbits = bits % bitsinuint;
            int shiftuints = bits / bitsinuint;
            byte[] result = new byte[len];

            if (len > 1)
                {
                // first uint is shifted without carry from previous byte (previous byte does not exist)
                unsafe
                    {
                    fixed (byte* fpba = ba, fpres = result)
                        {
                        uint* pres = (uint*)fpres + shiftuints;
                        uint* pba = (uint*)fpba;
                        *pres = *pba << shiftbits;
                        }
                    }
                System.Action paction = delegate()
                    {
                        int actidx;
                        lock (o)
                            {
                            actidx = ipar++;
                            }
                        unsafe
                            {
                            fixed (byte* fpba = ba, fpres = result)
                                {
                                // pointer to results; shift the bytes in the result
                                // (i.e. move left the pointer to the result)
                                uint* pres = (uint*)fpres + shiftuints + actidx + 1;
                                // pointer to original data, second byte
                                uint* pba1 = (uint*)fpba + actidx + 1;
                                if (shiftbits == 0)
                                    {
                                    while (pres < fpres + len)
                                        {
                                        *pres = *pba1;
                                        pres += paralleldegree; pba1 += paralleldegree;
                                        }
                                    }
                                else
                                    {
                                    // pointer to original data, first byte
                                    uint* pba2 = (uint*)fpba + actidx;
                                    while (pres < fpres + len)
                                        {
                                        *pres = *pba2 >> (bitsinuint - shiftbits) | *pba1 << shiftbits;
                                        pres += paralleldegree; pba1 += paralleldegree; pba2 += paralleldegree;
                                        }
                                    }
                                }
                            };

                    };
                System.Action[] actions = new Action[paralleldegree];
                for (int i = 0; i < paralleldegree; i++)
                    {
                    actions[i] = paction;
                    }
                Parallel.Invoke(actions);
                }

            return result;
            }
            
        public static byte[] Bin_ShiftRight(this byte[] ba, int bits)
            {
            int ipar = 0;
            object o = new object();
            int len = ba.Length;
            if (bits >= len * bitsinbyte) return new byte[len];
            int ulen = len / uintsize + 1 - (uintsize - (len % uintsize)) / uintsize;
            int shiftbits = bits % bitsinuint;
            int shiftuints = bits / bitsinuint;
            byte[] result = new byte[len];

            if (len > 1)
                {
                unsafe
                    {
                    fixed (byte* fpba = ba, fpres = result)
                        {
                        uint* pres = (uint*)fpres + ulen - shiftuints - 1;
                        uint* pba = (uint*)fpba + ulen - 1;
                        *pres = *pba >> shiftbits;
                        }
                    }
                System.Action paction = delegate()
                    {
                        int actidx;
                        lock (o)
                            {
                            actidx = ipar++;
                            }
                        unsafe
                            {
                            fixed (byte* fpba = ba, fpres = result)
                                {
                                // pointer to results; shift the bytes in the result
                                // (i.e. move left the pointer to the result)
                                uint* pres = (uint*)fpres + actidx;
                                // pointer to original data, first useful byte
                                uint* pba1 = (uint*)fpba + shiftuints + actidx;
                                if (shiftbits == 0)
                                    {
                                    while (pres < ((uint*)fpres) + ulen - shiftuints - 1)
                                        {
                                        *pres = *pba1;
                                        // increment pointers to next position
                                        pres += paralleldegree; pba1 += paralleldegree;
                                        }
                                    }
                                else
                                    {
                                    // pointer to original data, second useful byte
                                    uint* pba2 = (uint*)fpba + shiftuints + actidx + 1;
                                    while (pres < ((uint*)fpres) + ulen - shiftuints - 1)
                                        {
                                        // Core shift operation
                                        *pres = (*pba1 >> shiftbits | *pba2 << (bitsinuint - shiftbits));
                                        // increment pointers to next position
                                        pres += paralleldegree; pba1 += paralleldegree; pba2 += paralleldegree;
                                        }
                                    }
                                }
                            };
                    };
                System.Action[] actions = new Action[paralleldegree];
                for (int i = 0; i < paralleldegree; i++)
                    {
                    actions[i] = paction;
                    }
                Parallel.Invoke(actions);
                }
            return result;
            }
        }
    }
© . All rights reserved.