↓推荐关注↓
现代计算机系统每天都会产生大量的数据,随着系统规模的增大,把产生的所有调试数据存储到数据库是不可行的,因为它们产生以后就不会被改变,只是用来分析和解决故障。因此把它存储在本地磁盘上是一个很好的办法。
在这我们打算使用GOLANG读取一个16GB大小,上百万行内容的txt或者log文件。跟我一起来吧。
LET‘s CODE
首先我们使用GO标准库中的os.File来打开文件
f, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
// UPDATE: close after checking error
defer file.Close() //Do not forget to close the file
当文件被打开以后,我们有两个选择
-
逐行读取,这能帮助我们减少内存的使用,但会耗费大量的时间在IO上。 -
一次性读取整个文件到内存中进行处理,这将显著的增加内存使用,但是会节省大量的时间。
但是要注意,我们的文件大小是16GB,因此我们不可能把它一次性的加载到内存中。但是第一个选择也不适合,因为我们想在几秒内处理完成。
因此,仔细想想,我们也许还有第三个选择,我们不去一次性读取整个文件到内存中,而是分段读取,想想看GO的标准库中是不是有个bufio.NewReader()呢?
r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]
if n == 0 {
if err == io.EOF {
break
}
if err != nil {
fmt.Println(err)
break
}
return err
}
}
一旦我们有个一个分块,我们就可以开启一个goroutine去处理它。因此上面的代码可以做如下修改。
//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 500*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
slicePool := sync.Pool{New: func() interface{} {
lines := make([]string, 100)
return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
if err == io.EOF {
break
}
if err != nil {
fmt.Println(err)
break
}
return err
}
nextUntillNewline, err := r.ReadBytes('\n')//read entire line
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
//process each chunk concurrently
//start -> log start time, end -> log end time
ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)
wg.Done()
}()
}
wg.Wait()
}
上面的代码做了两个优化:
-
sync.Pool可以减轻GC的压力,我们可以重复使用已分配的内存,减少内存消耗,加快处理速度。 -
goroutine帮助我们并发处理多个切块,显著加快处理速度。
接下来我们就来实现ProcessChunk函数,来处理如下格式的日志文件
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n
我们将根据命令行提供的时间戳来提取日志
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further
var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow
noOfThread++
}
length := len(logsSlice)
//traverse the chunk
for i := 0; i < length; i += chunkSize {
wg2.Add(1)
//process each chunk in saperate chunk
go func(s int, e int) {
for i:= s; i<e;i++{
text := logsSlice[i]
if len(text) == 0 {
continue
}
logParts := strings.SplitN(text, ",", 2)
logCreationTimeString := logParts[0]
logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
// check if log's timestamp is inbetween our desired period
if logCreationTime.After(start) && logCreationTime.Before(end) {
fmt.Println(text)
}
}
textSlice = nil
wg2.Done()
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
//passing the indexes for processing
}
wg2.Wait() //wait for a chunk to finish
logsSlice = nil
}
使用上面的代码来打开处理16GB的日志文件,测试花费时间大概是25s。
转自:寒城
链接:zhuanlan.zhihu.com/p/184937550
- EOF -
看完本文有收获?请分享给更多人
推荐关注「Linux 爱好者」,提升Linux技能
点赞和在看就是最大的支持❤️
文章评论