TSP问题描述:
TSP问题的动态规划解法:
引用一下这篇文章,觉得作者把动态规划算法讲的非常明白:https://blog.csdn.net/derek_tian/article/details/45057443
动态规划算法的并行:
终于到了文章的重点,如何高效率的将主问题拆开并分配给各个线程运算是值得研究的话题。这里先引用动态规划算法的示意图:
(图2)
图中所示的是城市数为4的问题,从第一行到第二行,把问题拆分成了三个子问题,这三个子问题互不影响互不相关。将这三个子问题再一次拆分,每个问题又能拆出两个问题。将这个规律推广到n,城市数为n的问题通过第一次拆分可以变为n-1个子问题,再拆分一次共可以变为(n-1)*(n-2)个子问题。
有了能把问题拆成互不相干的子问题作为前提,在并行时就并不需为数据频繁的加锁解锁,能比较顺利地完成并行的操作了。接下来是具体的实现手段:
首先在全局维护唯一一个二维表,所有子线程的读取与修改都是在这一张表上进行的。例程中就是dp变量。
接下来定义一个函数,用来求解每个子问题,这个时候出现了另一个问题:怎么样表示一个子问题。在动态规划算法中有一个状态压缩的概念,即为用一段二进制码表示城市的集合,例如共五个城市时表示{1, 3, 4}这个集合即为二进制"11010"。相同的思路,对于d(1,{2,3})问题,我们给函数输入起点,以及目标的集合,就可以完整的表示问题。函数内选择的是递归算法,因为非递归算法很难保证子问题互不相关(可能也只是我懒得想,有思路的同学可以在评论区中讨论一下)。具体的动态规划递归实现算法很多文章中都有,我就不过多赘述了。下面是这一部分的源代码:
*注意:我在写这个函数的时候规定的setMask是包含了起点城市的,也就是对于d(1,{2,3})这个问题传入函数的参数的是(1,0b"1110")。
int TSP( int x, int setMask ) { int mask, i, minimum; if ( dp[ setMask ][ x ] != -1 ) return dp[ setMask ][ x ]; /* if already have value */ mask = 1 << x; dp[ setMask ][ x ] = 1e9; /* set infinite */ for ( i = 0; i < n; ++i ) { if ( i != x && ( setMask & ( 1 << i ) ) && map[ i ][ x ] != 0 ) { /* if have path */ minimum = TSP( i, setMask - mask ) + map[ i ][ x ] ; dp[ setMask ][ x ] = min( dp[ setMask ][ x ], minimum); } } return dp[ setMask ][ x ]; }
现在我们有了表达子问题的方法,就该开始解决具体怎么拆开问题的算法了。问题的要求中说明了最大可以创建的线程数,也就是说为了保证尽可能高的效率,程序中应该将问题拆到数量刚好大过线程数的情况。
当我们计算出了应该拆分的层数后,就该为产生的一大波子问题生成描述了。
有了之前定义子问题动态规划的算法为基础,其实可以发现,对于同一个setMask,其产生的同辈问题数是很容易得出来的。就拿刚刚的例子d(1,{2,3})来说,它的setMask是"1110",起点是1号城市。然而,对于与这个问题互为同辈关系的d(2,{1,3}),d(3,{1,3})问题来说,他们的setMask也都是"1110",只不过起点依次是2和3号城市。所以只要生成出了这个集合,其对应的问题都能很容易表示,这也就把工作的重心转义到了如何生成这个集合,也就是例程中是setMask。
对于从同一个整道题求解的问题分离出来的子问题来说,其城市集合其实是一个组合数。我们还选取图2作为例子,现在经过计算后,我们需要把主问题拆分两层,即拆到d(2,{3})这一层。我们列出所有的集合,分别是"1100", "1010", "0110",发现了吗,其实这些集合也就是剔除了起点城市以后,将两个1与一个0进行组合的所有情况。这个规律同样可以推广到n个城市,如果要将问题拆分k层,所有的问题集合即是固定集合中对应初始点(即程序输入的s)为0后将n-k-1个1与k-1个0的所有组合数。数学表示为:
*注:公式中的${S \choose n-k-1}$表示S集合的n-k-1排列。
$Let\ S=\{0,1,2,\cdots,n-1\}\backslash\{s\}
\\
Then\ let\ C= {S \choose n-k-1}
\\
For\ every\ i \in C, its\ setmask=\overbrace{00\cdots00}^{n}+(1<<i[0])+(1<<i[1])+\cdots+(1<<i[n-k-1])$
希望我讲清楚了里面的数学关系,如果觉得我讲的有问题或者看不懂的同学可以在评论区留言……以下是上述数学过程的c语言代码,为了逻辑更清晰我将计算组合数和求解所有组合数封成了factorial和setCombination函数:
while (t > nn) { /* let all thread have work to do */ nn *= (nn - 1); nc += 1; /* extends layers until t > nn */ } j = 0; candiNum = malloc(sizeof(int) * (n - 1)); for (i = 0; i < n; ++i) { /* candiNum set means all cities exclude the start piont*/ if (i != s) { candiNum[j] = i; /* exclude the start point */ j++; } } combNum = factorial(nc, n - 1); /* calculate combination number */ combSet = malloc(sizeof(int) * combNum); combptr = combSet; numOf1 = n - 1 - nc; /* there should have `numOf1` of 1 in every item */ setCombination(candiNum, n - 1, numOf1);
接下来就是使用pthread创建线程来求解每一个子问题了。其中主函数作为调度线程来给0~t-1个线程依次分配任务。因为基本上可以认为每一个平行的子问题计算的时间是完全一样的,所以只需要先给0~t-1个线程分配0~t-1号任务,再等待到0号线程结束以后,为其分配t号任务,以此类推。当所有的子问题都被解决以后,dp这个二维表中已经有所有我们需要的子问题结论了。此时主问题的答案可以很快通过这些答案计算出来,也没有并行的必要了。最后提供整个程序的代码以及Makefile:
// file tsp.c
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include "tsp.h" #include <time.h> #define min(a,b) (((a)<(b))?(a):(b)) /* macro func of min */ time_t tm; int ** map, ** dp, * combSet, * combptr, * posOf1, n; /* init vars */ trdArgs args[20]; pthread_t tid[20]; /* thread ids */ int TSP( int x, int setMask ) { int mask, i, minimum; if ( dp[ setMask ][ x ] != -1 ) return dp[ setMask ][ x ]; /* if already have value */ mask = 1 << x; dp[ setMask ][ x ] = 1e9; /* set infinite */ for ( i = 0; i < n; ++i ) { if ( i != x && ( setMask & ( 1 << i ) ) && map[ i ][ x ] != 0 ) { /* if have path */ minimum = TSP( i, setMask - mask ) + map[ i ][ x ] ; dp[ setMask ][ x ] = min( dp[ setMask ][ x ], minimum); } } return dp[ setMask ][ x ]; } void * trd(void * SArg) { TSP((*(trdArgs *)SArg).startPoint, (*(trdArgs *)SArg).setMask); return NULL; } void setCombination(int arr[], int n, int r) { /* set all combination into combptr */ int * data = malloc(sizeof(int) * r); /* Temporary array to store current combination */ combinationUtil(arr, data, 0, n - 1, 0, r); free(data); /* avoid leak */ } void combinationUtil(int arr[], int data[], int start, int end, int index, int r) { int i, j, tmpi = 0; /* init vars */ if (index == r) { for (j = 0; j < r; j++) /* add nums to binay */ tmpi |= (1 << data[j]); *combptr = tmpi; combptr++; /* next cell in array */ return; } for (i = start; i <= end && end - i + 1 >= r - index; i++) { /* recersive */ data[index] = arr[i]; combinationUtil(arr, data, i + 1, end, index + 1, r); /* call selfe */ } } long factorial(int m, int n) { /* caculate combination number */ int i, j; long ans = 1; if (m < n - m) m = n - m; /* C(m,n)=C(n-m,n) */ for (i = m + 1; i <= n; i++) ans *= i; for (j = 1; j <= n - m; j++) ans /= j; return ans; /* answer */ } int main() { long combNum; int t, s, i, j, k, nn, nc, ans, numOf1, * candiNum, * combSet, * tmptr; /* init vars */ nc = 0; scanf("%d %d %d", &t, &n, &s); /* get &t, &n, &s */ if (t < 0 || n <= 0 || n <= s || s < 0) {printf("-1\n"); return 0;} /* error input */ if (n == 1) {printf("0\n"); return 0;} nn = n - 1; /* first layer */ map = malloc(sizeof(int *)*n); /* alloc for 2dim array: map */ tmptr = malloc(sizeof(int) * n * n); for (i = 0; i < n; ++i) { map[i] = tmptr; /* set 2dim to 1dim */ tmptr += n; } dp = malloc(sizeof(int *) * (1 << n)); /* alloc for 2dim array: map */ tmptr = malloc(sizeof(int) * (1 << n) * n); for (i = 0; i < (1 << n); ++i) { dp[i] = tmptr; /* set 2dim to 1dim */ tmptr += n; } for (i = 0; i < n; i++) /* get the map */ for (j = 0; j < n; j++) scanf("%d", &map[i][j]); /* store */ memset(dp[0], -1, sizeof(int) * (1 << n) * n); /* fill all dp with -1 */ for ( i = 0; i < n; ++i ) dp[ 1 << i ][ i ] = map[ 0 ][ i ]; while (t > nn) { /* let all thread have work to do */ nn *= (nn - 1); nc += 1; /* extends layers until t > nn */ } j = 0; candiNum = malloc(sizeof(int) * (n - 1)); for (i = 0; i < n; ++i) { /* candiNum set means all cities exclude the start piont*/ if (i != s) { candiNum[j] = i; /* exclude the start point */ j++; } } combNum = factorial(nc, n - 1); /* calculate combination number */ combSet = malloc(sizeof(int) * combNum); combptr = combSet; numOf1 = n - 1 - nc; /* there should have `numOf1` of 1 in every item */ setCombination(candiNum, n - 1, numOf1); posOf1 = malloc(sizeof(int) * numOf1); /* arr to store the position of 1 in each comb */ k = 0; for (i = 0; i < combNum; ++i) { tmptr = posOf1; for (j = 0; j < n; ++j) { /* go through all bits */ if ((combSet[i] & (1 << j)) != 0) { *tmptr = j; tmptr++; /* point to next cell */ } } for (j = 0; j < numOf1; ++j) { /* arrange jobs */ args[k].id = k; /* set thread id arg */ args[k].startPoint = posOf1[j]; args[k].setMask = combSet[i]; /* set thread set mask args */ pthread_join(tid[k], NULL); pthread_create(&tid[k], NULL, trd, &args[k]); /* new thread */ k = (k + 1) % t; } } for (i = 0; i < t; ++i)/* join all thread */ pthread_join(tid[i], NULL); ans = TSP(0, ( 1 << n ) - 1); if (ans == 1e9) printf("-1\n"); else printf("%d\n", ans); /* final answer */ free(dp[0]); /* free */ free(dp); free(map[0]); /* free */ free(map); free(posOf1); /* free */ free(combSet); free(candiNum); /* free */ return 0; }
// file tsp.h typedef struct threadArgs { int id; int startPoint; int setMask; } trdArgs; int TSP( int x, int setMask ); void * trd(void * SArg) ; void combinationUtil(int arr[], int data[], int start, int end, int index, int r); void setCombination(int arr[], int n, int r) ; long factorial(int m, int n) ;
Makefile:
CC=gcc CFLAGS=-Wpedantic -Wall -Wextra -Werror -std=c89 -pthread -D_GNU_SOURCE tsp.o: tsp.c tsp.h ${CC} ${CFLAGS} tsp_blog.c -o tsp.o
欢迎没有完全看懂的读者随时提问,文中的例程也不一定是最好的算法,还请有想法的读者能不吝赐教。