Эх сурвалжийг харах

compare delete insert by scope , maintain last version as .bak

Fares 9 сар өмнө
parent
commit
c4d0e1a95d

+ 520 - 0
lib/handlers/file_upload_api copy.dart.bak

@@ -0,0 +1,520 @@
+import 'dart:io';
+import 'package:file_upload_processor/handlers/base_api.dart';
+import 'package:intl/intl.dart';
+import 'package:mime/mime.dart';
+import 'package:shelf/shelf.dart' as shelf;
+import 'package:path/path.dart' as path;
+import 'package:http_parser/http_parser.dart';
+import 'package:archive/archive.dart';
+import 'package:supabase/supabase.dart';
+
+class FileUploadApi extends BaseApi {
+  FileUploadApi(shelf.Request request) : super(request);
+  final uploadFolder = "./uploaded";
+  String get rawFolder => "$uploadFolder/raw";
+  String get dataFolder => "$uploadFolder/data";
+  String workingFolder = "";
+  String get zipFolder => "$rawFolder/$workingFolder";
+  String get extFolder => "$dataFolder/$workingFolder";
+
+  SupabaseClient getSupabaseClient(shelf.Request request) {
+    final supabaseUrl = request.headers['supabase-url'];
+    if (supabaseUrl == null) {
+      throw Exception('Supabase URL not provided in headers');
+    }
+
+    final authHeader = request.headers['authorization'];
+    if (authHeader == null || !authHeader.startsWith('Bearer ')) {
+      throw Exception('Invalid or missing Authorization Bearer token');
+    }
+
+    final token = authHeader.substring(7); // Remove 'Bearer ' prefix
+
+    return SupabaseClient(
+      supabaseUrl,
+      token,
+    );
+  }
+
+  String getTimestampedFilename(String originalFilename) {
+    final timestamp = DateFormat('yyyyMMdd_HHmmss').format(DateTime.now());
+    return '${timestamp}_$originalFilename';
+  }
+
+  Future<void> uploadToSupabase(
+      String filePath, String filename, SupabaseClient supabaseClient,
+      {bool timestamped = false,
+      required String bucket,
+      bool upsert = true}) async {
+    try {
+      final file = File(filePath);
+      final timestampedFilename =
+          timestamped ? getTimestampedFilename(filename) : filename;
+
+      await supabaseClient.storage.from(bucket).upload(
+            timestampedFilename,
+            file,
+            fileOptions: FileOptions(
+              cacheControl: '3600',
+              upsert: upsert,
+            ),
+          );
+
+      print('+File uploaded to <$bucket>: $timestampedFilename');
+    } catch (e) {
+      print('!Error uploading to Supabase: $e');
+      rethrow;
+    }
+  }
+
+  Future<void> initializeDirectories() async {
+    final directories = [
+      Directory(rawFolder),
+      Directory(dataFolder),
+      Directory(zipFolder),
+      Directory(extFolder),
+    ];
+
+    for (var dir in directories) {
+      if (!await dir.exists()) {
+        await dir.create(recursive: true);
+      }
+    }
+  }
+
+  bool isZipFile(List<int> bytes) {
+    if (bytes.length < 4) return false;
+    return bytes[0] == 0x50 &&
+        bytes[1] == 0x4B &&
+        bytes[2] == 0x03 &&
+        bytes[3] == 0x04;
+  }
+
+  Future<List<String>> processZipFile(String filePath) async {
+    List<String> files = [];
+    final bytes = await File(filePath).readAsBytes();
+    final archive = ZipDecoder().decodeBytes(bytes);
+
+    for (final file in archive) {
+      final filename = file.name;
+      if (file.isFile) {
+        final data = file.content as List<int>;
+        final outFile = File(path.join(extFolder, filename));
+        await outFile.parent.create(recursive: true);
+        await outFile.writeAsBytes(data);
+        files.add(path.join(extFolder, filename));
+      }
+    }
+    return files;
+  }
+
+  @override
+  response() async {
+    workingFolder = DateTime.now().millisecondsSinceEpoch.toString();
+
+    final supabaseClient = getSupabaseClient(request);
+    await initializeDirectories();
+
+    final contentType = request.headers['content-type'];
+    if (contentType == null ||
+        !contentType.toLowerCase().startsWith('multipart/form-data')) {
+      return shelf.Response.badRequest(
+          body: 'Content-Type must be multipart/form-data');
+    }
+
+    try {
+      final mediaType = MediaType.parse(contentType);
+      final boundary = mediaType.parameters['boundary'];
+      if (boundary == null) {
+        return shelf.Response.badRequest(body: 'Boundary not found');
+      }
+
+      final transformer = MimeMultipartTransformer(boundary);
+      final bodyBytes = await request.read().expand((e) => e).toList();
+      final stream = Stream.fromIterable([bodyBytes]);
+      final parts = await transformer.bind(stream).toList();
+
+      for (var part in parts) {
+        final contentDisposition = part.headers['content-disposition'];
+        if (contentDisposition == null) continue;
+
+        final filenameMatch =
+            RegExp(r'filename="([^"]*)"').firstMatch(contentDisposition);
+        if (filenameMatch == null) continue;
+
+        final filename = filenameMatch.group(1);
+        if (filename == null) continue;
+
+        final bytes = await part.fold<List<int>>(
+          [],
+          (prev, element) => [...prev, ...element],
+        );
+
+        final rawFilePath = path.join(zipFolder, filename);
+        await File(rawFilePath).writeAsBytes(bytes);
+
+        List<String> files = [];
+        if (isZipFile(bytes)) {
+          files.addAll(await processZipFile(rawFilePath));
+        } else {
+          final dataFilePath = path.join(extFolder, filename);
+          await File(rawFilePath).copy(dataFilePath);
+          files.add(dataFilePath);
+        }
+        bytes.clear();
+        //upload to supabase storage
+        await uploadToSupabase(rawFilePath, filename, supabaseClient,
+            bucket: 'csvhich', timestamped: false, upsert: true);
+        //upload to supabase storage archive timestamped
+        await uploadToSupabase(rawFilePath, filename, supabaseClient,
+            bucket: 'csvhich_archive', timestamped: true, upsert: false);
+        //insert data to supabase csvhichupdates
+        await supabaseClient
+            .from('csvhichupdates')
+            .insert({'filename': filename});
+
+        for (var file in files) {
+          final fileProcess = FileProcess(file, supabaseClient);
+          await fileProcess.go(donttouchdb: false);
+        }
+      }
+
+      return shelf.Response.ok('File processed and uploaded successfully');
+    } catch (e, stackTrace) {
+      //print('Error: $e\n$stackTrace');
+      return shelf.Response.internalServerError(
+          body: 'Error processing upload: $e');
+    } finally {
+      supabaseClient.dispose();
+      await File(zipFolder).delete(recursive: true);
+      await File(extFolder).delete(recursive: true);
+    }
+  }
+}
+
+class FileProcess {
+  FileProcess(this.filepath, this.supabase);
+  final String filepath;
+  final SupabaseClient supabase;
+  String get filename => filepath.replaceAll('\\', "/").split("/").last;
+
+  final Map<String, String> tables = {
+    "secondprgtype.txt": "aclegs_csv",
+    "ExportPGRGPNmois.txt": "pnlegs_csv",
+    "exportPGRGPN.txt": "pnlegs_csv",
+    "exportlicence.txt": "licences_csv",
+  };
+
+  final Map<String, List<String>> _headers = {
+    "secondprgtype.txt": [
+      "leg_no",
+      "fn_carrier",
+      "fn_number",
+      "fn_suffix",
+      "day_of_origin",
+      "ac_owner",
+      "ac_subtype",
+      "ac_version",
+      "ac_registration",
+      "dep_ap_actual",
+      "dep_ap_sched",
+      "dep_dt_est",
+      "dep_sched_dt",
+      "arr_ap_actual",
+      "arr_ap_sched",
+      "arr_dt_est",
+      "arr_sched_dt",
+      "slot_time_actual",
+      "leg_type",
+      "status",
+      "employer_cockpit",
+      "employer_cabin",
+      "cycles",
+      "delay_code_01",
+      "delay_code_02",
+      "delay_code_03",
+      "delay_code_04",
+      "delay_time_01",
+      "delay_time_02",
+      "delay_time_03",
+      "delay_time_04",
+      "subdelay_code_01",
+      "subdelay_code_02",
+      "subdelay_code_03",
+      "subdelay_code_04",
+      "pax_booked_c",
+      "pax_booked_y",
+      "pax_booked_trs_c",
+      "pax_booked_trs_y",
+      "pad_booked_c",
+      "pad_booked_y",
+      "offblock_dt_a",
+      "airborne_dt_a",
+      "landing_dt_a",
+      "onblock_dt_a",
+      "offblock_dt_f",
+      "airborne_dt_f",
+      "landing_dt_f",
+      "onblock_dt_f",
+      "offblock_dt_m",
+      "airborne_dt_m",
+      "landing_dt_m",
+      "onblock_dt_m",
+      "eet",
+    ],
+    "exportPGRGPN.txt": [
+      "date",
+      "tlc",
+      "actype",
+      "al",
+      "fnum",
+      "ddep",
+      "hdep",
+      "ddes",
+      "hdes",
+      "dep",
+      "des",
+      "label",
+      "type",
+    ],
+    "ExportPGRGPNmois.txt": [
+      "date",
+      "tlc",
+      "actype",
+      "al",
+      "fnum",
+      "ddep",
+      "hdep",
+      "ddes",
+      "hdes",
+      "dep",
+      "des",
+      "label",
+      "type",
+    ],
+    "exportlicence.txt": [
+      "tlc",
+      "fname",
+      "mname",
+      "lname",
+      "expire",
+      "ac",
+      "college",
+      "base",
+    ],
+  };
+  final Map<String, String> scopes = {
+    "secondprgtype.txt": "day_of_origin",
+    "exportPGRGPN.txt": "date",
+    "ExportPGRGPNmois.txt": "date",
+    "exportlicence.txt": "tlc",
+  };
+  final Map<String, List<String>> idToRemove = {
+    "secondprgtype.txt": ["day_of_origin"],
+    "exportPGRGPN.txt": ["date", "tlc"],
+    "ExportPGRGPNmois.txt": ["date", "tlc"],
+    "exportlicence.txt": ["tlc"],
+  };
+  final Map<String, Map<String, dynamic>> ids = {
+    "secondprgtype.txt": {
+      "table": "aclegs_log",
+      "headers": [
+        "day_of_origin",
+        "dep_sched_dt",
+        "fn_carrier",
+        "fn_number",
+        "dep_ap_sched",
+        "arr_ap_sched",
+        // "dep_ap_actual",
+        // "arr_ap_actual"
+      ]
+    },
+    "exportPGRGPN.txt": {
+      "table": "pnlegs_log",
+      "headers": ["tlc", "date", "dep", "des", "al", "fnum,", "label"]
+    },
+    "ExportPGRGPNmois.txt": {
+      "table": "pnlegs_log",
+      "headers": ["tlc", "date", "dep", "des", "al", "fnum,", "label"]
+    },
+    "exportlicence.txt": {
+      "table": "qualifs_log",
+      "headers": ["tlc", "college", "ac", "base"]
+    },
+  };
+
+  Future<List<Map<String, dynamic>>> parseCsv() async {
+    final headers = _headers[filename] ?? [];
+    if (headers.isEmpty) {
+      throw Exception('No headers found for file: $filename');
+    }
+
+    // Initialize an empty list to hold the parsed data
+    List<Map<String, dynamic>> data = [];
+
+    // Read the CSV file
+    final file = File(filepath);
+    final lines = await file.readAsLines();
+
+    // Iterate over each line in the CSV file
+    for (int i = 0; i < lines.length; i++) {
+      // Split the line into individual values
+      final values = lines[i].split(',');
+      if (values.length != headers.length) {
+        // print('Skipping line $i: Incorrect number of values: line: $i');
+        continue;
+      }
+
+      // Create a map for the current row
+      Map<String, dynamic> row = {};
+
+      // Assign each value to the corresponding header
+      for (int j = 0; j < headers.length; j++) {
+        row[headers[j]] = values[j].trim().removeQuotes.trim().nullIfEmpty;
+      }
+
+      // Add the row map to the data list
+      data.add(row);
+    }
+
+    // Return the parsed data
+    return data;
+  }
+
+  List<String> get filesTomonitor => _headers.keys.toList();
+  Future<void> go({bool donttouchdb = false}) async {
+    if (!filesTomonitor.contains(filename)) return;
+    final mapsToInsert = await parseCsv();
+    final scopeName = scopes[filename] ?? "";
+    final scopeInNew = mapsToInsert
+        .fold(<String>{}, (t, e) => t..add(e[scopeName] ?? "")).toList();
+
+    List<Map<String, dynamic>> oldIds = [];
+    List<Map<String, dynamic>> oldComparable = [];
+
+    //load old data
+    for (var e in chunkList(scopeInNew, 30)) {
+      final res = await supabase
+          .from(tables[filename]!)
+          .select()
+          .inFilter(scopeName, e)
+          .limit(300000);
+
+      oldIds.addAll(res.map((e) => {"id": e["id"]}));
+      oldComparable.addAll(res.map((e) => e..remove("id")));
+    }
+
+    final comparisonResult = compareLists(oldComparable, mapsToInsert);
+    final indexToRemove = comparisonResult.removeIndices;
+    final indexToMaintain = comparisonResult.maintainIndices;
+    final dataToInsert = comparisonResult.insertData;
+
+    try {
+      if (!donttouchdb)
+        for (var e in chunkList(
+            indexToRemove.map((f) => oldIds[f]['id']).toList(), 100)) {
+          await supabase
+              .from(tables[filename]!) // Replace with your actual table name
+              .delete()
+              .inFilter('id', e);
+        }
+
+      // insering new data
+      if (!donttouchdb)
+        await supabase
+            .from(tables[filename]!) // Replace with your actual table name
+            .insert(dataToInsert);
+    } catch (e, stackTrace) {
+      print('Error: $e\n$stackTrace');
+    }
+    print(
+        "         insert:${dataToInsert.length} remove:${indexToRemove.length} maintain:${indexToMaintain.length}");
+  }
+
+  ({
+    List<int> maintainIndices,
+    List<int> removeIndices,
+    // List<int> insertIndices
+    List<Map> insertData
+  }) compareLists(
+      List<Map<String, dynamic>> map1, List<Map<String, dynamic>> map2) {
+    List<int> maintainIndices = [];
+    List<int> removeIndices = [];
+    List<Map<String, dynamic>> insertData = map2;
+
+    // Find indices to maintain and remove in map1
+    for (int i = 0; i < map1.length; i++) {
+      final pos = insertData.findMap(map1[i]);
+      if (pos > -1) {
+        maintainIndices.add(i); // Item exists in both lists
+        insertData.removeAt(pos);
+      } else {
+        removeIndices.add(i); // Item does not exist in map2
+      }
+    }
+
+    return (
+      maintainIndices: maintainIndices,
+      removeIndices: removeIndices,
+      insertData: insertData
+    );
+  }
+
+  List<List<T>> chunkList<T>(List<T> list, int chunkSize) {
+    if (chunkSize <= 0) {
+      throw ArgumentError('chunkSize must be greater than 0');
+    }
+
+    List<List<T>> chunks = [];
+    for (var i = 0; i < list.length; i += chunkSize) {
+      chunks.add(list.sublist(
+          i, i + chunkSize > list.length ? list.length : i + chunkSize));
+    }
+    return chunks;
+  }
+}
+
+extension NullIfEmpty on String {
+  String? get nullIfEmpty => isEmpty ? null : this;
+}
+
+extension RemoveQuotes on String {
+  String get removeQuotes {
+    if (isEmpty) return this; // Return if the string is empty
+
+    // Remove the first and last characters if they are quotes
+    String result = this;
+    // Check if the first character is a quote
+    bool startsWithQuote = result.startsWith('"') || result.startsWith("'");
+    if (startsWithQuote) result = result.substring(1);
+    // Check if the last character is a quote
+    bool endsWithQuote = result.endsWith('"') || result.endsWith("'");
+    if (endsWithQuote) result = result.substring(0, result.length - 1);
+
+    return result;
+  }
+}
+
+bool mapsAreEqual(Map<String, dynamic> map1, Map<String, dynamic> map2) {
+  if (map1.length != map2.length) return false;
+  for (var key in map1.keys) {
+    if (map1[key] != map2[key]) return false;
+  }
+  return true;
+}
+
+extension ContainsMap on List<Map<String, dynamic>> {
+  bool containsMap(Map<String, dynamic> map) {
+    for (var item in this) {
+      if (mapsAreEqual(item, map)) return true;
+    }
+    return false;
+  }
+
+  int findMap(Map<String, dynamic> map) {
+    for (int i = 0; i < this.length; i++) {
+      if (mapsAreEqual(this.elementAt(i), map)) return i;
+    }
+    return -1;
+  }
+}

+ 32 - 30
lib/handlers/file_upload_api.dart

@@ -384,51 +384,53 @@ class FileProcess {
   List<String> get filesTomonitor => _headers.keys.toList();
   Future<void> go({bool donttouchdb = false}) async {
     if (!filesTomonitor.contains(filename)) return;
-    final mapsToInsert = await parseCsv();
+    final allmapsToInsert = await parseCsv();
     final scopeName = scopes[filename] ?? "";
-    final scopeInNew = mapsToInsert
+    final scopesInNew = allmapsToInsert
         .fold(<String>{}, (t, e) => t..add(e[scopeName] ?? "")).toList();
 
-    List<Map<String, dynamic>> oldIds = [];
-    List<Map<String, dynamic>> oldComparable = [];
+    for (var scopeInNew in scopesInNew) {
+      final mapsToInsert =
+          allmapsToInsert.where((e) => e[scopeName] == scopeInNew).toList();
+      List<Map<String, dynamic>> oldIds = [];
+      List<Map<String, dynamic>> oldComparable = [];
 
-    //load old data
-    for (var e in chunkList(scopeInNew, 30)) {
+      //load old data
       final res = await supabase
           .from(tables[filename]!)
           .select()
-          .inFilter(scopeName, e)
+          .eq(scopeName, scopeInNew)
           .limit(300000);
 
       oldIds.addAll(res.map((e) => {"id": e["id"]}));
       oldComparable.addAll(res.map((e) => e..remove("id")));
-    }
-
-    final comparisonResult = compareLists(oldComparable, mapsToInsert);
-    final indexToRemove = comparisonResult.removeIndices;
-    final indexToMaintain = comparisonResult.maintainIndices;
-    final dataToInsert = comparisonResult.insertData;
 
-    try {
-      if (!donttouchdb)
-        for (var e in chunkList(
-            indexToRemove.map((f) => oldIds[f]['id']).toList(), 100)) {
+      final comparisonResult = compareLists(oldComparable, mapsToInsert);
+      final indexToRemove = comparisonResult.removeIndices;
+      final indexToMaintain = comparisonResult.maintainIndices;
+      final dataToInsert = comparisonResult.insertData;
+
+      try {
+        if (!donttouchdb)
+          for (var e in chunkList(
+              indexToRemove.map((f) => oldIds[f]['id']).toList(), 100)) {
+            await supabase
+                .from(tables[filename]!) // Replace with your actual table name
+                .delete()
+                .inFilter('id', e);
+          }
+
+        // insering new data
+        if (!donttouchdb)
           await supabase
               .from(tables[filename]!) // Replace with your actual table name
-              .delete()
-              .inFilter('id', e);
-        }
-
-      // insering new data
-      if (!donttouchdb)
-        await supabase
-            .from(tables[filename]!) // Replace with your actual table name
-            .insert(dataToInsert);
-    } catch (e, stackTrace) {
-      print('Error: $e\n$stackTrace');
+              .insert(dataToInsert);
+      } catch (e, stackTrace) {
+        print('Error: $e\n$stackTrace');
+      }
+      print(
+          "   Scope:$scopeInNew      insert:${dataToInsert.length} remove:${indexToRemove.length} maintain:${indexToMaintain.length}");
     }
-    print(
-        "         insert:${dataToInsert.length} remove:${indexToRemove.length} maintain:${indexToMaintain.length}");
   }
 
   ({