在TinyFox中单线程执行耗时的任务

起因

    前一段时间在工作中,一个项目要用别的部门的插件,刚开始说多线程执行的,可以真到项目引用的时候,发现该插件是不支持多线程的,因为要监听并接收请求,还要放在服务器上,所以只能采用web项目,单因为web项目天生就是多线程,如何处理呢?

处理

  1. 因为项目简单,就是接收请求,处理请求
  2. 主要还是时间有限
  3. 采用TinyFox(独立版)做服务器软件,跨平台/轻量级,不用安装完整.Net环境,具体上代码    
public class Adapter
{
/// <summary>
/// 存放任务的队列
/// </summary>
public ConcurrentQueue<string> _queue;

/// <summary>
/// 任务工厂(通过自定义TaskScheduler,实现单线程执行任务)
/// </summary>
public TaskFactory fac;

/// <summary>
/// 配配器构造函数
/// </summary>
public Adapter()
{

/****************************************************
* JWS/TinyFox实例化此类型时会调用这个默认构造函数
* 所以,你可以在这儿写一些初始化代码
* 这个类只会被初始化一次
* ****************************************/
_queue = new ConcurrentQueue<string>();
fac = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(1)); //通过自定义任务调度器的构造函数限制线程数
}



/// <summary>
/// *** JWS或TinyFox所需要的关键函数 ***
/// <para>你的应用程序要与JWS/TinyFox服务器对接,必须提供这一个方法</para>
/// <para>服务器会将每个请求按OWIN规范打包成字典从这个方法传送给你</para>
/// </summary>
/// <param name="env">服务器按OWIN规范包装的“环境字典”,具体参见www.owin.org</param>
/// <returns>返回一个任务。当任务的状态翻转为完成、异常、取消等状态时,这个连接才会重新接受浏览器新的请求而进入下一轮会话</returns>
public Task OwinMain(IDictionary<string, object> env)
{
return ProcessRequest(env);
}


/// <summary>
/// 你自己对请求的处理函数
/// </summary>
/// <param name="env"></param>
/// <returns></returns>
private Task ProcessRequest(IDictionary<string, object> env)
{
var reqPath = env["owin.RequestPath"] as string;
// 返回null,表示这个路径不属OWIN处理,让JWS或TinyFox选择其它处理方式
if (reqPath != "/test" && reqPath != "/test/") return null;

var request = env["owin.RequestHeaders"] as IDictionary<string, string[]>;
var param = request["id"];
_queue.Enqueue(param[0]);
fac.StartNew(Handle);



// 从字典中获取向客户(浏览器)发送数据的“流”对象
var responseStream = env["owin.ResponseBody"] as Stream;

// 你准备发送的数据
const string outString = "hello world!";
var outBytes = Encoding.UTF8.GetBytes(outString);

// 从参数字典中获取Response HTTP头的字典对象
var responseHeaders = env["owin.ResponseHeaders"] as IDictionary<string, string[]>;

responseHeaders.Add("Content-Length", new[] { outBytes.Length.ToString() });
responseHeaders.Add("Content-Type", new[] { "text/plain; charset=utf-8" });

// 把正文写入流中,发送给浏览器
responseStream.Write(outBytes, 0, outBytes.Length);

return Task.FromResult<int>(0);
}

/// <summary>
/// 具体要执行的任务
/// </summary>
private void Handle()
{
string result;
if (!_queue.TryDequeue(out result))
{
//从队列取出要执行的数据,出对失败,记录一下
Console.WriteLine("queue empty");
}
//模拟真实情况,真实情况要几十秒
Thread.Sleep(1000);
//获取队列中数据和打印当前执行的线程id
Console.WriteLine("result id = {0} ,threadid = {1}", result, Thread.CurrentThread.ManagedThreadId);
}
}

前面只是接收请求放入队列中,真正干活是LimitedConcurrencyLevelTaskScheduler

public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;

private readonly LinkedList<Task> _tasks = new LinkedList<Task>();

private readonly int _maxDegreeOfParallelism;

private int _delegatesQueuedOrRunning = 0;

public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1)
throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}

public int CurrentCount { get; set; }

protected sealed override void QueueTask(Task task)
{
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
private static object executeLock = new object();
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
_currentThreadIsProcessingItems = true;
try
{
while (true)
{
Task item;
lock (_tasks)
{
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
base.TryExecuteTask(item);
}
}
finally { _currentThreadIsProcessingItems = false; }
}, null);
}

protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (!_currentThreadIsProcessingItems)
return false;

if (taskWasPreviouslyQueued)
TryDequeue(task);

return base.TryExecuteTask(task);
}

protected sealed override bool TryDequeue(Task task)
{
lock (_tasks)
return _tasks.Remove(task);
}

public sealed override int MaximumConcurrencyLevel
{
get
{
return _maxDegreeOfParallelism;
}
}

protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken)
return _tasks.ToArray();
else
throw new NotSupportedException();
}
finally
{
if (lockTaken)
Monitor.Exit(_tasks);
}
}
}

1.虽然用任务调度器但最终还是用线程池来做的任务,只是通过构造函数线程实现了单线程一直在执行

2.说说为什么要用TinyFox,是因为服务器上没有.Net 4.0环境,不想麻烦

3.使用ConcurrentQueue是一个高效的线程安全的队列

4. Adapter初始化一些数据,上面已经注释了