f, err := os.Open(fileName) if err != nil { fmt.Println("cannot able to read the file", err) return } // UPDATE: close after checking error defer file.Close() //Do not forget to close the file
//sync pools to reuse the memory and decrease the preassure on Garbage Collector linesPool := sync.Pool{New: func()interface{} { lines := make([]byte, 500*1024) return lines }} stringPool := sync.Pool{New: func()interface{} { lines := "" return lines }} slicePool := sync.Pool{New: func()interface{} { lines := make([]string, 100) return lines }} r := bufio.NewReader(f) var wg sync.WaitGroup //wait group to keep track off all threads for {
buf := linesPool.Get().([]byte) n, err := r.Read(buf) buf = buf[:n] if n == 0 { if err != nil { fmt.Println(err) break } if err == io.EOF { break } return err } nextUntillNewline, err := r.ReadBytes('\n')//read entire line
if err != io.EOF { buf = append(buf, nextUntillNewline...) }
wg.Add(1) gofunc() {
//process each chunk concurrently //start -> log start time, end -> log end time
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n
我们将根据命令行提供的时间戳提取日志。
funcProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) { //another wait group to process every chunk further var wg2 sync.WaitGroup logs := stringPool.Get().(string) logs = string(chunk) linesPool.Put(chunk) //put back the chunk in pool //split the string by "\n", so that we have slice of logs logsSlice := strings.Split(logs, "\n") stringPool.Put(logs) //put back the string pool chunkSize := 100//process the bunch of 100 logs in thread n := len(logsSlice) noOfThread := n / chunkSize if n%chunkSize != 0 { //check for overflow noOfThread++ } length := len(logsSlice) //traverse the chunk for i := 0; i < length; i += chunkSize {
wg2.Add(1) //process each chunk in saperate chunk gofunc(s int, e int) { for i:= s; i<e;i++{ text := logsSlice[i] iflen(text) == 0 { continue }
logParts := strings.SplitN(text, ",", 2) logCreationTimeString := logParts[0] logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString) if err != nil { fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text) return } // check if log's timestamp is inbetween our desired period if logCreationTime.After(start) && logCreationTime.Before(end) {
fmt.Println(text) } } textSlice = nil wg2.Done()
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice))))) //passing the indexes for processing } wg2.Wait() //wait for a chunk to finish logsSlice = nil }
对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。
完整的代码示例如下:
funcmain() {
s := time.Now() args := os.Args[1:] iflen(args) != 6 { // for format LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location" fmt.Println("Please give proper command line arguments") return } startTimeArg := args[1] finishTimeArg := args[3] fileName := args[5]
file, err := os.Open(fileName)
if err != nil { fmt.Println("cannot able to read the file", err) return }
defer file.Close() //close after checking err
queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg) if err != nil { fmt.Println("Could not able to parse the start time", startTimeArg) return }
queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg) if err != nil { fmt.Println("Could not able to parse the finish time", finishTimeArg) return }
filestat, err := file.Stat() if err != nil { fmt.Println("Could not able to get the file stat") return }
lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString) if err != nil { fmt.Println("can not able to parse time : ", err) }
if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) { Process(file, queryStartTime, queryFinishTime) }
fmt.Println("\nTime taken - ", time.Since(s)) }
funcProcess(f *os.File, start time.Time, end time.Time)error {
chunkSize := 300 n := len(logsSlice) noOfThread := n / chunkSize
if n%chunkSize != 0 { noOfThread++ }
for i := 0; i < (noOfThread); i++ {
wg2.Add(1) gofunc(s int, e int) { defer wg2.Done() //to avaoid deadlocks for i := s; i < e; i++ { text := logsSlice[i] iflen(text) == 0 { continue } logSlice := strings.SplitN(text, ",", 2) logCreationTimeString := logSlice[0]
logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString) if err != nil { fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text) return }
if logCreationTime.After(start) && logCreationTime.Before(end) { //fmt.Println(text) } }
文章评论