Go語言在文件處理和併發方面有天然優勢,下面是一個完整的實現方案:
1. 基礎文件處理工具框架
package main
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
)
// FileProcessor 文件處理器接口
type FileProcessor interface {
Process(filePath string) error
Name() string
}
// FileTool 文件處理工具
type FileTool struct {
processors []FileProcessor
concurrent bool
workers int
}
// NewFileTool 創建新的文件處理工具
func NewFileTool(concurrent bool, workers int) *FileTool {
if workers <= 0 {
workers = 10
}
return &FileTool{
concurrent: concurrent,
workers: workers,
}
}
// RegisterProcessor 註冊處理器
func (ft *FileTool) RegisterProcessor(processor FileProcessor) {
ft.processors = append(ft.processors, processor)
}
// ProcessFiles 處理文件
func (ft *FileTool) ProcessFiles(paths []string) error {
start := time.Now()
if ft.concurrent {
return ft.processConcurrently(paths)
} else {
return ft.processSequentially(paths)
}
fmt.Printf("處理完成,耗時: %v\n", time.Since(start))
return nil
}
2. 併發處理實現
// processConcurrently 併發處理文件
func (ft *FileTool) processConcurrently(paths []string) error {
var wg sync.WaitGroup
fileChan := make(chan string, len(paths))
errorChan := make(chan error, len(paths))
// 啓動worker
for i := 0; i < ft.workers; i++ {
wg.Add(1)
go ft.worker(fileChan, errorChan, &wg)
}
// 發送文件路徑到channel
go func() {
for _, path := range paths {
fileChan <- path
}
close(fileChan)
}()
// 等待所有worker完成
go func() {
wg.Wait()
close(errorChan)
}()
// 收集錯誤
var errors []error
for err := range errorChan {
if err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return fmt.Errorf("處理過程中發生 %d 個錯誤", len(errors))
}
return nil
}
// worker 工作協程
func (ft *FileTool) worker(fileChan <-chan string, errorChan chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
for filePath := range fileChan {
if err := ft.processSingleFile(filePath); err != nil {
errorChan <- fmt.Errorf("文件 %s 處理失敗: %v", filePath, err)
}
}
}
3. 順序處理實現
// processSequentially 順序處理文件
func (ft *FileTool) processSequentially(paths []string) error {
for _, path := range paths {
if err := ft.processSingleFile(path); err != nil {
return fmt.Errorf("文件 %s 處理失敗: %v", path, err)
}
}
return nil
}
// processSingleFile 處理單個文件
func (ft *FileTool) processSingleFile(filePath string) error {
info, err := os.Stat(filePath)
if err != nil {
return err
}
if info.IsDir() {
return ft.processDirectory(filePath)
}
for _, processor := range ft.processors {
if err := processor.Process(filePath); err != nil {
return err
}
}
return nil
}
// processDirectory 處理目錄
func (ft *FileTool) processDirectory(dirPath string) error {
return filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
for _, processor := range ft.processors {
if err := processor.Process(path); err != nil {
return err
}
}
}
return nil
})
}
4. 具體處理器實現示例
// LineCounter 行數統計處理器
type LineCounter struct{}
func (lc *LineCounter) Process(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
fmt.Printf("文件 %s 有 %d 行\n", filePath, lineCount)
return scanner.Err()
}
func (lc *LineCounter) Name() string {
return "行數統計器"
}
// FileCopier 文件複製處理器
type FileCopier struct {
destDir string
}
func (fc *FileCopier) Process(filePath string) error {
srcFile, err := os.Open(filePath)
if err != nil {
return err
}
defer srcFile.Close()
destPath := filepath.Join(fc.destDir, filepath.Base(filePath))
destFile, err := os.Create(destPath)
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, srcFile)
if err != nil {
return err
}
fmt.Printf("文件 %s 已複製到 %s\n", filePath, destPath)
return nil
}
func (fc *FileCopier) Name() string {
return "文件複製器"
}
// ContentReplacer 內容替換處理器
type ContentReplacer struct {
oldStr string
newStr string
}
func (cr *ContentReplacer) Process(filePath string) error {
content, err := os.ReadFile(filePath)
if err != nil {
return err
}
newContent := []byte(string(content))
if cr.oldStr != "" {
newContent = []byte(strings.ReplaceAll(string(content), cr.oldStr, cr.newStr))
}
// 備份原文件
backupPath := filePath + ".bak"
if err := os.Rename(filePath, backupPath); err != nil {
return err
}
if err := os.WriteFile(filePath, newContent, 0644); err != nil {
// 恢復備份
os.Rename(backupPath, filePath)
return err
}
// 刪除備份
os.Remove(backupPath)
fmt.Printf("文件 %s 內容已替換\n", filePath)
return nil
}
func (cr *ContentReplacer) Name() string {
return "內容替換器"
}
5. 使用示例
func main() {
// 創建文件處理工具(啓用併發,10個worker)
fileTool := NewFileTool(true, 10)
// 註冊處理器
fileTool.RegisterProcessor(&LineCounter{})
fileTool.RegisterProcessor(&FileCopier{destDir: "./backup"})
fileTool.RegisterProcessor(&ContentReplacer{
oldStr: "old_text",
newStr: "new_text",
})
// 處理文件
files := []string{
"file1.txt",
"file2.txt",
"directory/",
}
if err := fileTool.ProcessFiles(files); err != nil {
fmt.Printf("處理失敗: %v\n", err)
}
}
6. 性能優化技巧
// 使用緩衝IO提高讀寫性能
type BufferedFileProcessor struct {
bufferSize int
}
func (bfp *BufferedFileProcessor) Process(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// 使用帶緩衝的reader
reader := bufio.NewReaderSize(file, bfp.bufferSize)
// 處理邏輯...
return nil
}
// 批量處理減少系統調用
type BatchProcessor struct {
batchSize int
}
func (bp *BatchProcessor) ProcessFilesBatch(files []string) error {
// 批量處理邏輯...
return nil
}
關鍵特性總結
- 併發處理:利用goroutine和channel實現高效的併發文件處理
- 可擴展架構:通過接口設計支持多種文件處理操作
- 錯誤處理:完善的錯誤處理和恢復機制
- 性能優化:緩衝IO、批量操作等優化手段
- 靈活性:支持文件和目錄的遞歸處理
這個框架可以根據具體需求進行擴展,添加更多的文件處理功能。